You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:19 UTC
[10/50] [abbrv] tez git commit: TEZ-1703. Exception handling for
InputInitializer. (zjffdu)
TEZ-1703. Exception handling for InputInitializer. (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ec29425
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ec29425
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ec29425
Branch: refs/heads/TEZ-8
Commit: 4ec29425d63decfd8de4e1528f043271cf7cb3b2
Parents: 7f8fc75
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Oct 31 09:55:22 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Oct 31 09:55:22 2014 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/runtime/api/InputInitializer.java | 3 +-
.../app/dag/RootInputInitializerManager.java | 30 +++-
.../dag/event/VertexEventRootInputFailed.java | 7 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 4 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 52 ++++--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 178 ++++++++++++++++++-
.../tez/test/TestExceptionPropagation.java | 138 +++++++++++---
8 files changed, 362 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b2dc65..61b380e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -92,6 +92,7 @@ ALL CHANGES:
TEZ-1716. Additional ATS data for UI.
TEZ-1722. DAG should be related to Application Id in ATS data.
TEZ-1711. Don't cache outputSpecList in VertexImpl.getOutputSpecList(taskIndex)
+ TEZ-1703. Exception handling for InputInitializer.
Release 0.5.1: 2014-10-02
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index 7b22b62..d9d6517 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -101,7 +101,8 @@ public abstract class InputInitializer {
* @param stateUpdate an event indicating the name of the vertex, and it's updated state.
* Additional information may be available for specific events, Look at the
* type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+ * @throws Exception
*/
- public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 1f7a83f..bdd3689 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -19,6 +19,8 @@
package org.apache.tez.dag.app.dag;
import javax.annotation.Nullable;
+
+import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.HashMap;
@@ -53,7 +55,9 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
@@ -275,12 +279,18 @@ public class RootInputInitializerManager {
@SuppressWarnings("unchecked")
@Override
public void onFailure(Throwable t) {
+ // catch real root cause of failure, it would throw UndeclaredThrowableException
+ // if using UGI.doAs
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
initializer.setComplete();
LOG.info(
"Failed InputInitializer for Input: " + initializer.getInput().getName() +
" on vertex " + initializer.getVertexLogIdentifier());
eventHandler
- .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getName(), t));
+ .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getName(),
+ new AMUserCodeException(Source.InputInitializer,t)));
}
}
@@ -294,6 +304,7 @@ public class RootInputInitializerManager {
private final InputInitializerContext context;
private final AtomicBoolean isComplete = new AtomicBoolean(false);
private final String vertexLogIdentifier;
+ private final TezVertexID vertexId;
private final StateChangeNotifier stateChangeNotifier;
private final List<String> notificationRegisteredVertices = Lists.newArrayList();
private final AppContext appContext;
@@ -306,6 +317,7 @@ public class RootInputInitializerManager {
this.initializer = initializer;
this.context = context;
this.vertexLogIdentifier = vertex.getLogIdentifier();
+ this.vertexId = vertex.getVertexId();
this.stateChangeNotifier = stateChangeNotifier;
this.appContext = appContext;
}
@@ -348,6 +360,7 @@ public class RootInputInitializerManager {
}
}
+ @SuppressWarnings("unchecked")
@Override
public void onStateUpdated(VertexStateUpdate event) {
if (isComplete()) {
@@ -357,7 +370,13 @@ public class RootInputInitializerManager {
" since initializer " + input.getName() + " is already complete.");
}
} else {
- initializer.onVertexStateUpdated(event);
+ try {
+ initializer.onVertexStateUpdated(event);
+ } catch (Exception e) {
+ appContext.getEventHandler().handle(
+ new VertexEventRootInputFailed(vertexId, input.getName(),
+ new AMUserCodeException(Source.InputInitializer,e)));
+ }
}
}
@@ -455,14 +474,15 @@ public class RootInputInitializerManager {
sendEvents(toForwardEvents);
}
+ @SuppressWarnings("unchecked")
private void sendEvents(List<InputInitializerEvent> events) {
if (events != null && !events.isEmpty()) {
try {
initializer.handleInputInitializerEvent(events);
} catch (Exception e) {
- throw new TezUncheckedException(
- "Initializer for input: " + getInput().getName() + " on vertex: " + getVertexLogIdentifier() +
- " failed to process events", e);
+ appContext.getEventHandler().handle(
+ new VertexEventRootInputFailed(vertexId, input.getName(),
+ new AMUserCodeException(Source.InputInitializer,e)));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
index 4ab4ae9..b7701d7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
@@ -18,20 +18,21 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.records.TezVertexID;
public class VertexEventRootInputFailed extends VertexEvent {
private final String inputName;
- private final Throwable error;
+ private final AMUserCodeException error;
- public VertexEventRootInputFailed(TezVertexID vertexId, String inputName, Throwable error) {
+ public VertexEventRootInputFailed(TezVertexID vertexId, String inputName, AMUserCodeException error) {
super(vertexId, VertexEventType.V_ROOT_INPUT_FAILED);
this.inputName = inputName;
this.error = error;
}
- public Throwable getError() {
+ public AMUserCodeException getError() {
return this.error;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 6dccf3a..cddcbd5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1686,9 +1686,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
else if (vertexEvent.getVertexState() == VertexState.FAILED) {
job.enactKill(
- DAGTerminationCause.VERTEX_FAILURE,
- vertexEvent.getVertexTerminationCause() == null ? VertexTerminationCause.OTHER_VERTEX_FAILURE
- : vertexEvent.getVertexTerminationCause());
+ DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
job.vertexFailed(vertex);
forceTransitionToKillWait = true;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c182810..4a88949 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -335,7 +335,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexState.INITED, VertexState.FAILED),
VertexEventType.V_READY_TO_INIT,
new VertexInitializedTransition())
- .addTransition(VertexState.INITIALIZING, VertexState.FAILED,
+ .addTransition(VertexState.INITIALIZING,
+ EnumSet.of(VertexState.FAILED),
VertexEventType.V_ROOT_INPUT_FAILED,
new RootInputInitFailedTransition())
.addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
@@ -367,6 +368,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Transitions from INITED state
// SOURCE_VERTEX_STARTED - for sources which determine parallelism,
// they must complete before this vertex can start.
+ .addTransition(VertexState.INITED,
+ EnumSet.of(VertexState.FAILED),
+ VertexEventType.V_ROOT_INPUT_FAILED,
+ new RootInputInitFailedTransition())
.addTransition
(VertexState.INITED,
EnumSet.of(VertexState.INITED, VertexState.ERROR),
@@ -399,6 +404,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
+ .addTransition(VertexState.RUNNING,
+ EnumSet.of(VertexState.TERMINATING),
+ VertexEventType.V_ROOT_INPUT_FAILED,
+ new RootInputInitFailedTransition())
.addTransition(VertexState.RUNNING, VertexState.RUNNING,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
@@ -451,6 +460,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
@@ -483,6 +493,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new TaskCompletedAfterVertexSuccessTransition())
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
// after we are done reruns of source tasks should not affect
// us. These reruns may be triggered by other consumer vertices.
@@ -501,6 +512,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.FAILED, VertexState.FAILED,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_START,
@@ -522,6 +534,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.KILLED, VertexState.KILLED,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_INIT,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_START,
@@ -541,6 +554,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.ERROR,
VertexState.ERROR,
EnumSet.of(VertexEventType.V_INIT,
+ VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_START,
VertexEventType.V_ROUTE_EVENT,
@@ -1654,7 +1668,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) {
vertex.setFinishTime();
- String diagnosticMsg = "Vertex failed/killed due to VertexManager failed. "
+ String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount
+ + " killedTasks:"
+ + vertex.killedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.abortVertex(State.FAILED);
+ return vertex.finished(VertexState.FAILED);
+ }
+ else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) {
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. "
+ "failedTasks:"
+ vertex.failedTaskCount
+ " killedTasks:"
@@ -1665,7 +1690,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
else {
//should never occur
- throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex"
+ throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -3202,19 +3227,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
private static class RootInputInitFailedTransition implements
- SingleArcTransition<VertexImpl, VertexEvent> {
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@Override
- public void transition(VertexImpl vertex, VertexEvent event) {
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
VertexEventRootInputFailed fe = (VertexEventRootInputFailed) event;
String msg = "Vertex Input: " + fe.getInputName()
+ " initializer failed, vertex=" + vertex.getLogIdentifier();
- if (fe.getError() != null) {
- msg = msg + ExceptionUtils.getStackTrace(fe.getError());
+ LOG.error(msg, fe.getError());
+ if (vertex.getState() == VertexState.RUNNING) {
+ vertex.addDiagnostic(msg
+ + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause()));
+ vertex.tryEnactKill(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE,
+ TaskTerminationCause.AM_USERCODE_FAILURE);
+ return VertexState.TERMINATING;
+ } else {
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg
+ + ", " + ExceptionUtils.getStackTrace(fe.getError().getCause()));
+ return VertexState.FAILED;
}
- LOG.error(msg);
- vertex.finished(VertexState.FAILED,
- VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, msg);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index fdf0e07..ef2c7bd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -125,6 +125,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
@@ -4647,7 +4648,8 @@ public class TestVertexImpl {
super.runInputInitializers(inputs);
eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
.get(0).getName(),
- new RuntimeException("MockInitializerFailed")));
+ new AMUserCodeException(Source.InputInitializer,
+ new RuntimeException("MockInitializerFailed"))));
dispatcher.await();
}
@@ -4969,9 +4971,164 @@ public class TestVertexImpl {
initVertex(v1);
String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
assertTrue(diagnostics.contains(IIExceptionLocation.Initialize.name()));
+ Assert.assertEquals(VertexState.FAILED, v1.getState());
Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause());
}
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testExceptionFromII_InitFailedAfterInitialized() throws AMUserCodeException {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize2);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithIIException();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ // INIT_SUCCEEDED followed by INIT_FAILURE
+ initVertex(v1);
+ dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized(
+ v1.getVertexId(), "input1", null));
+ dispatcher.await();
+
+ String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
+ assertTrue(diagnostics.contains(IIExceptionLocation.Initialize2.name()));
+ Assert.assertEquals(VertexState.FAILED, v1.getState());
+ Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testExceptionFromII_InitFailedAfterRunning() throws AMUserCodeException {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize2);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithIIException();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ initVertex(v1);
+ dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized(
+ v1.getVertexId(), "input1", null));
+ dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
+ VertexEventType.V_START));
+ dispatcher.await();
+
+ String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
+ assertTrue(diagnostics.contains(IIExceptionLocation.Initialize2.name()));
+ Assert.assertEquals(VertexState.FAILED, v1.getState());
+ Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testExceptionFromII_HandleInputInitializerEvent() throws AMUserCodeException, InterruptedException {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+
+ initVertex(v1);
+ startVertex(v1);
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+ dispatcher.await();
+
+ // Wait for the initializer to be invoked - which may be a separate thread.
+ while (!initializer.initStarted.get()) {
+ Thread.sleep(10);
+ }
+ Assert.assertFalse(initializer.eventReceived.get());
+ Assert.assertFalse(initializer.initComplete.get());
+
+ // Signal the initializer by sending an event - via vertex1
+ InputInitializerEvent event = InputInitializerEvent.create("vertex2", "input1", null);
+ // Create taskId and taskAttemptId for the single task that exists in vertex1
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ TezEvent tezEvent = new TezEvent(event,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1));
+
+ // at least one task attempt is succeed, otherwise input initialize events won't been handled.
+ dispatcher.getEventHandler().handle(new TaskEvent(t0_v1, TaskEventType.T_ATTEMPT_LAUNCHED));
+ dispatcher.getEventHandler().handle(new TaskEventTAUpdate(ta0_t0_v1, TaskEventType.T_ATTEMPT_SUCCEEDED));
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+ dispatcher.await();
+
+ // it would cause v2 fail as its II throw exception in handleInputInitializerEvent
+ String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
+ assertTrue(diagnostics.contains(IIExceptionLocation.HandleInputInitializerEvent.name()));
+ Assert.assertEquals(VertexState.FAILED, v2.getState());
+ Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
+ }
+
+ @Test(timeout = 5000)
+ public void testExceptionFromII_OnVertexStateUpdated() throws AMUserCodeException, InterruptedException {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+
+ initVertex(v1);
+ startVertex(v1); // v2 would get the state update from v1
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.FAILED, v2.getState());
+ String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
+ assertTrue(diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name()));
+ Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testExceptionFromII_InitSucceededAfterInitFailure() throws AMUserCodeException, InterruptedException {
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+ VertexImplWithRunningInputInitializer v2 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+
+ initVertex(v1);
+ startVertex(v1); // v2 would get the state update from v1
+ // it should be OK receive INIT_SUCCEEDED event after INIT_FAILED event
+ dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized(
+ v2.getVertexId(), "input1", null));
+
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+ Assert.assertEquals(VertexState.FAILED, v2.getState());
+ String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
+ assertTrue(diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name()));
+ Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
+ }
@InterfaceAudience.Private
public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin {
@@ -5119,6 +5276,9 @@ public class TestVertexImpl {
public static enum IIExceptionLocation {
Initialize,
+ Initialize2, // for test case that InputInitFailed after InputInitSucceeded
+ HandleInputInitializerEvent,
+ OnVertexStateUpdated
}
@InterfaceAudience.Private
@@ -5154,6 +5314,16 @@ public class TestVertexImpl {
if (exLocation == IIExceptionLocation.Initialize) {
throw new Exception(exLocation.name());
}
+ if (exLocation == IIExceptionLocation.Initialize2) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // InputInitializerManager is been shutdown if Initialized succeeded,
+ // catch the exception and throw the exception to simulate the case that
+ // init failure after init succeeded
+ throw new Exception(exLocation.name());
+ }
+ }
context.registerForVertexStateUpdates("vertex1", null);
initStarted.set(true);
lock.lock();
@@ -5175,6 +5345,9 @@ public class TestVertexImpl {
@Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws
Exception {
+ if (exLocation == IIExceptionLocation.HandleInputInitializerEvent) {
+ throw new Exception(exLocation.name());
+ }
initializerEvents.addAll(events);
if (initializerEvents.size() == numExpectedEvents) {
eventReceived.set(true);
@@ -5197,6 +5370,9 @@ public class TestVertexImpl {
}
public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ if (exLocation == IIExceptionLocation.OnVertexStateUpdated) {
+ throw new RuntimeException(exLocation.name());
+ }
stateUpdates.add(stateUpdate);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ec29425/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index eef6ab3..0175d7b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
@@ -62,9 +63,10 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
-import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
-import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -84,8 +86,8 @@ import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.test.TestAMRecovery.DoNothingProcessor;
+import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -210,7 +212,7 @@ public class TestExceptionPropagation {
* @throws Exception
*
*/
- @Test(timeout = 120000)
+ @Test(timeout = 180000)
public void testExceptionPropagationSession() throws Exception {
try {
startSessionClient();
@@ -309,6 +311,10 @@ public class TestExceptionPropagation {
EM_RouteInputErrorEventToSource,
// Not Supported yet
// EM_RouteInputSourceTaskFailedEventToDestination,
+
+ // II
+ II_Initialize, II_HandleInputInitializerEvents, II_OnVertexStateUpdated
+
}
/**
@@ -333,22 +339,31 @@ public class TestExceptionPropagation {
v1.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(payload));
Vertex v2 =
- Vertex.create("v2",
- ProcessorDescriptor.create(SleepProcessor.class.getName())
- .setUserPayload(new SleepProcessorConfig(3).toUserPayload())
- , 1);
- v2.setVertexManagerPlugin(ShuffleVertexManagerWithException.getVMDesc(exLocation));
+ Vertex.create("v2", DoNothingProcessor.getProcDesc(), 1);
+ v2.addDataSource("input2",
+ DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()),
+ InputInitializerWithException2.getIIDesc(payload), null));
dag.addVertex(v1)
- .addVertex(v2)
- .addEdge(Edge.create(v1, v2, EdgeProperty.create(
- EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName())
- .setUserPayload(payload),
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
+ .addVertex(v2);
+ if (exLocation.name().startsWith("EM_")) {
+ dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(
+ EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName())
+ .setUserPayload(payload),
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
+ } else {
+ // set Customized VertexManager here, it can't been used for CustomEdge
+ v2.setVertexManagerPlugin(InputReadyVertexManagerWithException.getVMDesc(exLocation));
+ dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
+ }
+
return dag;
}
+ // InputInitializer of vertex1
public static class InputInitializerWithException extends InputInitializer {
private ExceptionLocation exLocation;
@@ -380,7 +395,65 @@ public class TestExceptionPropagation {
}
}
- // input of vertex2
+ // InputInitializer of vertex2
+ public static class InputInitializerWithException2 extends InputInitializer {
+
+ private ExceptionLocation exLocation;
+ private Object condition = new Object();
+
+ public InputInitializerWithException2(
+ InputInitializerContext initializerContext) {
+ super(initializerContext);
+ this.exLocation =
+ ExceptionLocation.valueOf(new String(getContext().getUserPayload()
+ .deepCopyAsArray()));
+ }
+
+ @Override
+ public List<Event> initialize() throws Exception {
+ if (exLocation == ExceptionLocation.II_Initialize) {
+ throw new Exception(exLocation.name());
+ }
+ if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
+ getContext().registerForVertexStateUpdates("v1", null);
+ }
+
+ if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents
+ || exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
+ // wait for handleInputInitializerEvent() and onVertexStateUpdated() is invoked
+ synchronized (condition) {
+ condition.wait();
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events)
+ throws Exception {
+ if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
+ throw new RuntimeException(exLocation.name());
+ }
+ }
+
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate)
+ throws Exception {
+ if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
+ throw new Exception(exLocation.name());
+ }
+ super.onVertexStateUpdated(stateUpdate);
+ }
+
+ public static InputInitializerDescriptor getIIDesc(UserPayload payload) {
+ return InputInitializerDescriptor.create(
+ InputInitializerWithException2.class.getName())
+ .setUserPayload(payload);
+ }
+ }
+
+ // Input of vertex2
public static class InputWithException extends AbstractLogicalInput {
private ExceptionLocation exLocation;
@@ -436,10 +509,12 @@ public class TestExceptionPropagation {
getContext().requestInitialMemory(0l, null); // mandatory call
if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) {
throw new Exception(this.exLocation.name());
- } else if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource
- || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
- Event errorEvent = InputReadErrorEvent.create("read error", 0, 0);
- return Lists.newArrayList(errorEvent);
+ } else if ( getContext().getSourceVertexName().equals("v1")) {
+ if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource
+ || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
+ Event errorEvent = InputReadErrorEvent.create("read error", 0, 0);
+ return Lists.newArrayList(errorEvent);
+ }
}
return null;
}
@@ -450,7 +525,7 @@ public class TestExceptionPropagation {
}
}
- // output of vertex1
+ // Output of vertex1
public static class OutputWithException extends AbstractLogicalOutput {
private ExceptionLocation exLocation;
@@ -497,8 +572,12 @@ public class TestExceptionPropagation {
List<Event> events = new ArrayList<Event>();
events.add(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])));
return events;
- }
- else {
+ } else if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
+ // send InputInitliazer to InputInitializer of v2
+ List<Event> events = new ArrayList<Event>();
+ events.add(InputInitializerEvent.create("v2", "input2", ByteBuffer.wrap(new byte[0])));
+ return events;
+ } else {
return null;
}
}
@@ -576,6 +655,7 @@ public class TestExceptionPropagation {
}
}
+ // VertexManager of vertex1
public static class RootInputVertexManagerWithException extends RootInputVertexManager {
private ExceptionLocation exLocation;
@@ -618,12 +698,13 @@ public class TestExceptionPropagation {
}
}
- public static class ShuffleVertexManagerWithException extends ShuffleVertexManager {
+ // VertexManager of vertex2
+ public static class InputReadyVertexManagerWithException extends InputReadyVertexManager {
private ExceptionLocation exLocation;
private static final String Test_ExceptionLocation = "Test.ExceptionLocation";
- public ShuffleVertexManagerWithException(VertexManagerPluginContext context) {
+ public InputReadyVertexManagerWithException(VertexManagerPluginContext context) {
super(context);
}
@@ -666,12 +747,13 @@ public class TestExceptionPropagation {
Configuration conf = new Configuration();
conf.set(Test_ExceptionLocation, exLocation.name());
UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
- return VertexManagerPluginDescriptor.create(ShuffleVertexManagerWithException.class.getName())
+ return VertexManagerPluginDescriptor.create(InputReadyVertexManagerWithException.class.getName())
.setUserPayload(payload);
}
}
- public static class CustomEdgeManager extends ScatterGatherEdgeManager {
+ // EdgeManager for edge linking vertex1 and vertex2
+ public static class CustomEdgeManager extends OneToOneEdgeManager {
private ExceptionLocation exLocation;