You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/10/09 09:07:29 UTC

tez git commit: TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master c6c9f6ecd -> f9d15c869


TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f9d15c86
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f9d15c86
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f9d15c86

Branch: refs/heads/master
Commit: f9d15c8695de7975817631b051450336bc5eadee
Parents: c6c9f6e
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Oct 9 15:07:17 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Oct 9 15:07:17 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                           |  4 ++++
 .../apache/tez/dag/app/TaskCommunicatorManager.java   |  1 +
 .../org/apache/tez/runtime/task/TaskReporter.java     | 14 ++++++++++----
 .../org/apache/tez/test/TestExceptionPropagation.java | 12 +++++++++++-
 4 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8e5da31..25bb1d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
   TEZ-1788. Allow vertex level disabling of speculation
   TEZ-2868. Fix setting Caller Context in Tez Examples.
   TEZ-2860. NPE in DAGClientImpl.
@@ -200,6 +201,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
   TEZ-2868. Fix setting Caller Context in Tez Examples.
   TEZ-2860. NPE in DAGClientImpl.
   TEZ-2855. Fix a potential NPE while routing VertexManager events.
@@ -475,6 +477,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
   TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2716. DefaultSorter.isRleNeeded not thread safe
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
@@ -706,6 +709,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
   TEZ-2398. Flaky test: TestFaultTolerance
   TEZ-2808. Race condition between preemption and container assignment
   TEZ-2834. Make Tez preemption resilient to incorrect free resource reported

http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 0bc02dc..8c17c2c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -241,6 +241,7 @@ public class TaskCommunicatorManager extends AbstractService implements
         tezEvent.setEventReceivedTime(currTime);
         final EventType eventType = tezEvent.getEventType();
         if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+          // send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED otherwise Status may be missed
           taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
               (TaskStatusUpdateEvent) tezEvent.getEvent());
         } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 6705020..263300e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -374,15 +374,21 @@ public class TaskReporter implements TaskReporterInterface {
         EventMetaData srcMeta) throws IOException, TezException {
       // Ensure only one final event is ever sent.
       if (!finalEventQueued.getAndSet(true)) {
-        TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+        List<TezEvent> tezEvents = new ArrayList<TezEvent>();
         if (diagnostics == null) {
           diagnostics = ExceptionUtils.getStackTrace(t);
         } else {
           diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
         }
-        TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
-            srcMeta == null ? updateEventMetadata : srcMeta);
-        return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
+        tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+            srcMeta == null ? updateEventMetadata : srcMeta));
+        try {
+          tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata));
+        } catch (Exception e) {
+          // Counter may exceed limitation
+          LOG.warn("Error when get constructing TaskStatusUpdateEvent");
+        }
+        return !heartbeat(tezEvents).shouldDie;
       } else {
         LOG.warn("A final task state event has already been sent. Not sending again");
         return askedToDie.get();

http://git-wip-us.apache.org/repos/asf/tez/blob/f9d15c86/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index b8b46cb..7d88fdf 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -225,7 +225,11 @@ public class TestExceptionPropagation {
         DAGStatus dagStatus = dagClient.waitForCompletion();
         String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
         LOG.info("Diagnostics:" + diagnostics);
-        assertTrue(diagnostics.contains(exLocation.name()));
+        if (exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+          assertTrue(diagnostics.contains("Too many counters"));
+        } else {
+          assertTrue(diagnostics.contains(exLocation.name()));
+        }
       }
     } finally {
       stopSessionClient();
@@ -302,6 +306,7 @@ public class TestExceptionPropagation {
     // PROCESSOR_HANDLE_EVENTS
     PROCESSOR_RUN_ERROR, PROCESSOR_CLOSE_ERROR, PROCESSOR_INITIALIZE_ERROR,
     PROCESSOR_RUN_EXCEPTION, PROCESSOR_CLOSE_EXCEPTION, PROCESSOR_INITIALIZE_EXCEPTION,
+    PROCESSOR_COUNTER_EXCEEDED,
 
     // VM
     VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED, VM_ON_VERTEX_STARTED,
@@ -626,6 +631,11 @@ public class TestExceptionPropagation {
         throw new Error(this.exLocation.name());
       } else if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) {
         throw new Exception(this.exLocation.name());
+      } else if (this.exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+        // simulate the counter limitation exceeded
+        for (int i=0;i< TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT+1; ++i) {
+          getContext().getCounters().findCounter("mycounter", "counter_"+i).increment(1);
+        }
       }
     }