You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/02 01:04:30 UTC

tez git commit: TEZ-2394. Issues when there is an error in VertexManager callbacks (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 1a5317578 -> 9f090279d


TEZ-2394. Issues when there is an error in VertexManager callbacks (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9f090279
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9f090279
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9f090279

Branch: refs/heads/master
Commit: 9f090279d269fbcd63b357781318eb2163c82762
Parents: 1a53175
Author: Bikas Saha <bi...@apache.org>
Authored: Fri May 1 16:04:21 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri May 1 16:04:21 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/StateChangeNotifier.java    |  4 +-
 .../tez/dag/app/dag/impl/VertexManager.java     | 77 ++++++++++++++------
 3 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c718ed..609db3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2394. Issues when there is an error in VertexManager callbacks
   TEZ-2386. Tez UI: Inconsistent usage of icon colors
   TEZ-2395. Tez UI: Minimum/Maximum Duration show a empty bracket next to 0 secs when you purposefully failed a job.
   TEZ-2360. per-io counters flag should generate both overall and per-edge counters

http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index 260cbf3..990bdea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -71,7 +71,7 @@ public class StateChangeNotifier {
       this.listener = listener;
     }
     
-    void sentUpdate() {
+    void sendUpdate() {
       listener.onStateUpdated(update);
     }
     
@@ -105,7 +105,7 @@ public class StateChangeNotifier {
             continue;
           }
           try {
-            event.sentUpdate();
+            event.sendUpdate();
             processedEventFromQueue();
           } catch (Exception e) {
             // TODO send user code exception - TEZ-2332

http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 1ed42fc..945d9ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -58,6 +58,8 @@ import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
@@ -432,10 +434,22 @@ public class VertexManager {
     }
     if (eventInFlight.compareAndSet(false, true)) {
       // no event was in flight
+      // ensures only 1 event is in flight
       VertexManagerEvent e = eventQueue.poll();
-      Preconditions.checkState(e != null);
-      ListenableFuture<Void> future = execService.submit(e);
-      Futures.addCallback(future, e.getCallback());
+      if (e != null) {
+        ListenableFuture<Void> future = execService.submit(e);
+        Futures.addCallback(future, e.getCallback());
+      } else {
+        // This may happen. Lets say Callback succeeded on threadA. It set eventInFlight to false 
+        // and called tryScheduleNextEvent() and found queue not empty but got paused before it 
+        // could check eventInFlight.compareAndSet(). Another thread managed to dequeue the event 
+        // and schedule a callback. That callback succeeded and set eventInFlight to false, found 
+        // the queue empty and completed. Now threadA woke up and successfully did compareAndSet()
+        // tried to dequeue an event and got null.
+        // This could also happen if there is a bug and we manage to schedule for than 1 callback
+        // verify that is not the case
+        Preconditions.checkState(eventInFlight.compareAndSet(true, false));
+      }
     }
   }
 
@@ -484,36 +498,55 @@ public class VertexManager {
 
     @Override
     public void onFailure(Throwable t) {
-      // stop further event processing
-      pluginFailed.set(true);
-      eventQueue.clear();
-      // catch real root cause of failure, it would throw UndeclaredThrowableException
-      // if using UGI.doAs
-      if (t instanceof UndeclaredThrowableException) {
-        t = t.getCause();
+      try {
+        Preconditions.checkState(eventInFlight.get());
+        // stop further event processing
+        pluginFailed.set(true);
+        eventQueue.clear();
+        // catch real root cause of failure, it would throw UndeclaredThrowableException
+        // if using UGI.doAs
+        if (t instanceof UndeclaredThrowableException) {
+          t = t.getCause();
+        }
+        Preconditions.checkState(appContext != null);
+        Preconditions.checkState(managedVertex != null);
+        // state change must be triggered via an event transition
+        appContext.getEventHandler().handle(
+            new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
+                new AMUserCodeException(Source.VertexManager, t)));
+        // enqueue no further events due to user code error
+      } catch (Exception e) {
+        sendInternalError(e);
       }
-      Preconditions.checkState(appContext != null);
-      Preconditions.checkState(managedVertex != null);
-      // state change must be triggered via an event transition
-      appContext.getEventHandler().handle(
-          new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
-              new AMUserCodeException(Source.VertexManager, t)));
-      // enqueue no further events due to user code error
     }
     
     @Override
     public void onSuccess(Void result) {
-      Preconditions.checkState(eventInFlight.get());
-      eventInFlight.set(false);
-      tryScheduleNextEvent();
+      try {
+        onSuccessDerived(result);
+        Preconditions.checkState(eventInFlight.compareAndSet(true, false));
+        tryScheduleNextEvent();
+      } catch (Exception e) {
+        sendInternalError(e);
+      }
+    }
+    
+    protected void onSuccessDerived(Void result) {
+    }
+    
+    private void sendInternalError(Exception e) {
+      // fail the DAG so that we dont hang
+      // state change must be triggered via an event transition
+      LOG.error("Error after vertex manager callback " + managedVertex.getLogIdentifier(), e);
+      appContext.getEventHandler().handle(
+          (new DAGEvent(managedVertex.getVertexId().getDAGId(), DAGEventType.INTERNAL_ERROR)));
     }
   }
   
   private class VertexManagerRootInputInitializedCallback extends VertexManagerCallback {
 
     @Override
-    public void onSuccess(Void result) {
-      super.onSuccess(result);
+    protected void onSuccessDerived(Void result) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("vertex:" + managedVertex.getLogIdentifier()
             + "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:"