You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/02 01:04:30 UTC
tez git commit: TEZ-2394. Issues when there is an error in
VertexManager callbacks (bikas)
Repository: tez
Updated Branches:
refs/heads/master 1a5317578 -> 9f090279d
TEZ-2394. Issues when there is an error in VertexManager callbacks (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9f090279
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9f090279
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9f090279
Branch: refs/heads/master
Commit: 9f090279d269fbcd63b357781318eb2163c82762
Parents: 1a53175
Author: Bikas Saha <bi...@apache.org>
Authored: Fri May 1 16:04:21 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri May 1 16:04:21 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/StateChangeNotifier.java | 4 +-
.../tez/dag/app/dag/impl/VertexManager.java | 77 ++++++++++++++------
3 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c718ed..609db3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
ALL CHANGES:
+ TEZ-2394. Issues when there is an error in VertexManager callbacks
TEZ-2386. Tez UI: Inconsistent usage of icon colors
TEZ-2395. Tez UI: Minimum/Maximum Duration show a empty bracket next to 0 secs when you purposefully failed a job.
TEZ-2360. per-io counters flag should generate both overall and per-edge counters
http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index 260cbf3..990bdea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -71,7 +71,7 @@ public class StateChangeNotifier {
this.listener = listener;
}
- void sentUpdate() {
+ void sendUpdate() {
listener.onStateUpdated(update);
}
@@ -105,7 +105,7 @@ public class StateChangeNotifier {
continue;
}
try {
- event.sentUpdate();
+ event.sendUpdate();
processedEventFromQueue();
} catch (Exception e) {
// TODO send user code exception - TEZ-2332
http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 1ed42fc..945d9ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -58,6 +58,8 @@ import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
@@ -432,10 +434,22 @@ public class VertexManager {
}
if (eventInFlight.compareAndSet(false, true)) {
// no event was in flight
+ // ensures only 1 event is in flight
VertexManagerEvent e = eventQueue.poll();
- Preconditions.checkState(e != null);
- ListenableFuture<Void> future = execService.submit(e);
- Futures.addCallback(future, e.getCallback());
+ if (e != null) {
+ ListenableFuture<Void> future = execService.submit(e);
+ Futures.addCallback(future, e.getCallback());
+ } else {
+ // This may happen. Lets say Callback succeeded on threadA. It set eventInFlight to false
+ // and called tryScheduleNextEvent() and found queue not empty but got paused before it
+ // could check eventInFlight.compareAndSet(). Another thread managed to dequeue the event
+ // and schedule a callback. That callback succeeded and set eventInFlight to false, found
+ // the queue empty and completed. Now threadA woke up and successfully did compareAndSet()
+ // tried to dequeue an event and got null.
+ // This could also happen if there is a bug and we manage to schedule for than 1 callback
+ // verify that is not the case
+ Preconditions.checkState(eventInFlight.compareAndSet(true, false));
+ }
}
}
@@ -484,36 +498,55 @@ public class VertexManager {
@Override
public void onFailure(Throwable t) {
- // stop further event processing
- pluginFailed.set(true);
- eventQueue.clear();
- // catch real root cause of failure, it would throw UndeclaredThrowableException
- // if using UGI.doAs
- if (t instanceof UndeclaredThrowableException) {
- t = t.getCause();
+ try {
+ Preconditions.checkState(eventInFlight.get());
+ // stop further event processing
+ pluginFailed.set(true);
+ eventQueue.clear();
+ // catch real root cause of failure, it would throw UndeclaredThrowableException
+ // if using UGI.doAs
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
+ Preconditions.checkState(appContext != null);
+ Preconditions.checkState(managedVertex != null);
+ // state change must be triggered via an event transition
+ appContext.getEventHandler().handle(
+ new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
+ new AMUserCodeException(Source.VertexManager, t)));
+ // enqueue no further events due to user code error
+ } catch (Exception e) {
+ sendInternalError(e);
}
- Preconditions.checkState(appContext != null);
- Preconditions.checkState(managedVertex != null);
- // state change must be triggered via an event transition
- appContext.getEventHandler().handle(
- new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
- new AMUserCodeException(Source.VertexManager, t)));
- // enqueue no further events due to user code error
}
@Override
public void onSuccess(Void result) {
- Preconditions.checkState(eventInFlight.get());
- eventInFlight.set(false);
- tryScheduleNextEvent();
+ try {
+ onSuccessDerived(result);
+ Preconditions.checkState(eventInFlight.compareAndSet(true, false));
+ tryScheduleNextEvent();
+ } catch (Exception e) {
+ sendInternalError(e);
+ }
+ }
+
+ protected void onSuccessDerived(Void result) {
+ }
+
+ private void sendInternalError(Exception e) {
+ // fail the DAG so that we dont hang
+ // state change must be triggered via an event transition
+ LOG.error("Error after vertex manager callback " + managedVertex.getLogIdentifier(), e);
+ appContext.getEventHandler().handle(
+ (new DAGEvent(managedVertex.getVertexId().getDAGId(), DAGEventType.INTERNAL_ERROR)));
}
}
private class VertexManagerRootInputInitializedCallback extends VertexManagerCallback {
@Override
- public void onSuccess(Void result) {
- super.onSuccess(result);
+ protected void onSuccessDerived(Void result) {
if (LOG.isDebugEnabled()) {
LOG.debug("vertex:" + managedVertex.getLogIdentifier()
+ "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:"