You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/09/11 22:22:42 UTC
[2/2] git commit: TEZ-850. Recovery unit tests. (Jeff Zhang via
hitesh)
TEZ-850. Recovery unit tests. (Jeff Zhang via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f65e65ae
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f65e65ae
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f65e65ae
Branch: refs/heads/master
Commit: f65e65aea8cbdb44dd65c6590fbe38dd84413a5a
Parents: d6589d3
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 11 13:22:21 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 11 13:22:21 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 45 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 3 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 12 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 240 +++---
.../tez/dag/app/dag/impl/TestDAGImpl.java | 6 +-
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 514 +++++++++++
.../app/dag/impl/TestTaskAttemptRecovery.java | 178 ++++
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 629 +++++++++++---
.../dag/app/dag/impl/TestVertexRecovery.java | 860 +++++++++++++++++++
10 files changed, 2243 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 032438f..4fc7e83 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
TEZ-1559. Add system tests for AM recovery.
+ TEZ-850. Recovery unit tests.
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 24d2e6b..daaa81b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -139,7 +139,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final TaskAttemptListener taskAttemptListener;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private final Object tasksSyncHandle = new Object();
-
+
private volatile boolean committedOrAborted = false;
private volatile boolean allOutputsCommitted = false;
boolean commitAllOutputsOnSuccess = true;
@@ -157,7 +157,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final StateChangeNotifier entityUpdateTracker;
volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
- private Map<String, Edge> edges = new HashMap<String, Edge>();
+ @VisibleForTesting
+ Map<String, Edge> edges = new HashMap<String, Edge>();
private TezCounters dagCounters = new TezCounters();
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
@@ -359,14 +360,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private DAGTerminationCause terminationCause;
private Credentials credentials;
- private long initTime;
- private long startTime;
- private long finishTime;
-
+ @VisibleForTesting
+ long initTime;
+ @VisibleForTesting
+ long startTime;
+ @VisibleForTesting
+ long finishTime;
+
Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
private DAGState recoveredState = DAGState.NEW;
- private boolean recoveryCommitInProgress = false;
+ @VisibleForTesting
+ boolean recoveryCommitInProgress = false;
Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
static class VertexGroupInfo {
@@ -381,7 +386,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList());
edgeMergedInputs = Maps.newHashMapWithExpectedSize(groupInfo.getEdgeMergedInputsCount());
for (PlanGroupInputEdgeInfo edgInfo : groupInfo.getEdgeMergedInputsList()) {
- edgeMergedInputs.put(edgInfo.getDestVertexName(),
+ edgeMergedInputs.put(edgInfo.getDestVertexName(),
DagTypeConverters.convertInputDescriptorFromDAGPlan(edgInfo.getMergedInput()));
}
outputs = Sets.newHashSet(groupInfo.getOutputsList());
@@ -706,7 +711,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
}
-
+
private boolean commitOutput(String outputName, OutputCommitter outputCommitter) {
final OutputCommitter committer = outputCommitter;
try {
@@ -723,7 +728,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
return false;
}
-
+
private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
if (this.committedOrAborted) {
LOG.info("Ignoring multiple output commit/abort");
@@ -731,7 +736,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
LOG.info("Calling DAG commit/abort for dag: " + getID());
this.committedOrAborted = true;
-
+
boolean successfulOutputsAlreadyCommitted = !commitAllOutputsOnSuccess;
boolean failedWhileCommitting = false;
if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
@@ -772,7 +777,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
LOG.info("No output committers for vertex: " + vertex.getName());
continue;
}
- Map<String, OutputCommitter> outputCommitters =
+ Map<String, OutputCommitter> outputCommitters =
new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
Set<String> sharedOutputs = vertex.getSharedOutputs();
// remove shared outputs
@@ -793,7 +798,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
LOG.info("Committing output: " + entry.getKey() + " for vertex: "
+ vertex.getVertexId());
if (vertex.getState() != VertexState.SUCCEEDED) {
- throw new TezUncheckedException("Vertex: " + vertex.getName() +
+ throw new TezUncheckedException("Vertex: " + vertex.getName() +
" not in SUCCEEDED state. State= " + vertex.getState());
}
if (!commitOutput(entry.getKey(), entry.getValue())) {
@@ -803,11 +808,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
}
-
+
if (failedWhileCommitting) {
LOG.info("DAG: " + getID() + " failed while committing");
}
-
+
if (!dagSucceeded || failedWhileCommitting) {
// come here because dag failed or
// dag succeeded and all or none semantics were on and a commit failed
@@ -1026,9 +1031,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (finishTime == 0) {
setFinishTime();
}
-
+
boolean allOutputsCommitted = commitOrAbortOutputs(finalState == DAGState.SUCCEEDED);
-
+
if (finalState == DAGState.SUCCEEDED && !allOutputsCommitted) {
finalState = DAGState.FAILED;
trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
@@ -1057,7 +1062,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
LOG.info("DAG: " + getID() + " finished with state: " + finalState);
return finalState;
}
-
+
private DAGStatus.State getDAGStatusFromState(DAGState finalState) {
switch (finalState) {
case NEW:
@@ -1631,7 +1636,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ ", numFailedVertices=" + job.numFailedVertices
+ ", numKilledVertices=" + job.numKilledVertices
+ ", numVertices=" + job.numVertices);
-
+
if (failed) {
return DAGState.TERMINATING;
}
@@ -1724,7 +1729,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
addDiagnostic("Vertex re-running"
+ ", vertexName=" + vertex.getName()
+ ", vertexId=" + vertex.getVertexId());
-
+
if (!commitAllOutputsOnSuccess) {
// partial output may already have been committed. fail if so
List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertex.getName());
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e63dbf5..7ba90b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -137,7 +137,8 @@ public class TaskAttemptImpl implements TaskAttempt,
private String nodeHttpAddress;
private String nodeRackName;
- private TaskAttemptStatus reportedStatus;
+ @VisibleForTesting
+ TaskAttemptStatus reportedStatus;
private DAGCounter localityCounter;
// Used to store locality information when
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ecd2bcc..1dd711b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -116,7 +116,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
protected final AppContext appContext;
private final Resource taskResource;
private final ContainerContext containerContext;
- private long scheduledTime;
+ @VisibleForTesting
+ long scheduledTime;
private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
@@ -124,7 +125,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
- private int numberUncompletedAttempts = 0;
+ @VisibleForTesting
+ int numberUncompletedAttempts = 0;
private boolean historyTaskStartGenerated = false;
@@ -290,11 +292,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
//saying COMMIT_PENDING
private TezTaskAttemptID commitAttempt;
- private TezTaskAttemptID successfulAttempt;
+ @VisibleForTesting
+ TezTaskAttemptID successfulAttempt;
@VisibleForTesting
int failedAttempts;
- private int finishedAttempts;//finish are total of success, failed and killed
+ @VisibleForTesting
+ int finishedAttempts;//finish are total of success, failed and killed
private final boolean leafVertex;
private TaskState recoveredState = TaskState.NEW;
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ff556ba..31240cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -205,9 +205,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
//fields initialized in init
- private int numStartedSourceVertices = 0;
- private int numInitedSourceVertices = 0;
- private int numRecoveredSourceVertices = 0;
+ @VisibleForTesting
+ int numStartedSourceVertices = 0;
+ @VisibleForTesting
+ int numInitedSourceVertices = 0;
+ @VisibleForTesting
+ int numRecoveredSourceVertices = 0;
private int distanceFromRoot = 0;
@@ -238,7 +241,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new VertexStateChangedCallback();
private VertexState recoveredState = VertexState.NEW;
- private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
+ @VisibleForTesting
+ List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
private boolean vertexAlreadyInitialized = false;
protected static final
@@ -254,7 +258,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.INITIALIZING, VertexState.FAILED),
VertexEventType.V_INIT,
new InitTransition())
- .addTransition(VertexState.NEW,
+ .addTransition(VertexState.NEW,
EnumSet.of(VertexState.NEW),
VertexEventType.V_NULL_EDGE_INITIALIZED,
new NullEdgeInitializedTransition())
@@ -308,7 +312,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Transitions from INITIALIZING state
.addTransition(VertexState.INITIALIZING,
- EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
+ EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
VertexState.FAILED),
VertexEventType.V_ROOT_INPUT_INITIALIZED,
new RootInputInitializedTransition())
@@ -341,14 +345,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.INITIALIZING, VertexState.ERROR,
VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- .addTransition(VertexState.INITIALIZING,
+ .addTransition(VertexState.INITIALIZING,
EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
VertexState.FAILED),
VertexEventType.V_NULL_EDGE_INITIALIZED,
new NullEdgeInitializedTransition())
// Transitions from INITED state
- // SOURCE_VERTEX_STARTED - for sources which determine parallelism,
+ // SOURCE_VERTEX_STARTED - for sources which determine parallelism,
// they must complete before this vertex can start.
.addTransition
(VertexState.INITED,
@@ -358,14 +362,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.INITED, VertexState.INITED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
new SourceVertexStartedTransition())
- .addTransition(VertexState.INITED,
+ .addTransition(VertexState.INITED,
EnumSet.of(VertexState.INITED),
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
new OneToOneSourceSplitTransition())
.addTransition(VertexState.INITED, VertexState.INITED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
- .addTransition(VertexState.INITED,
+ .addTransition(VertexState.INITED,
EnumSet.of(VertexState.RUNNING, VertexState.INITED),
VertexEventType.V_START,
new StartTransition())
@@ -393,7 +397,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.ERROR),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedTransition())
- .addTransition(VertexState.RUNNING,
+ .addTransition(VertexState.RUNNING,
EnumSet.of(VertexState.RUNNING),
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
new OneToOneSourceSplitTransition())
@@ -455,7 +459,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROUTE_EVENT,
ROUTE_EVENT_TRANSITION)
.addTransition(
- VertexState.SUCCEEDED,
+ VertexState.SUCCEEDED,
EnumSet.of(VertexState.FAILED, VertexState.ERROR),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedAfterVertexSuccessTransition())
@@ -551,17 +555,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine;
//changing fields while the vertex is running
- private int numTasks;
- private int completedTaskCount = 0;
- private int succeededTaskCount = 0;
- private int failedTaskCount = 0;
- private int killedTaskCount = 0;
-
- private long initTimeRequested; // Time at which INIT request was received.
- private long initedTime; // Time when entering state INITED
- private long startTimeRequested; // Time at which START request was received.
- private long startedTime; // Time when entering state STARTED
- private long finishTime;
+ @VisibleForTesting
+ int numTasks;
+ @VisibleForTesting
+ int completedTaskCount = 0;
+ @VisibleForTesting
+ int succeededTaskCount = 0;
+ @VisibleForTesting
+ int failedTaskCount = 0;
+ @VisibleForTesting
+ int killedTaskCount = 0;
+
+ @VisibleForTesting
+ long initTimeRequested; // Time at which INIT request was received.
+ @VisibleForTesting
+ long initedTime; // Time when entering state INITED
+ @VisibleForTesting
+ long startTimeRequested; // Time at which START request was received.
+ @VisibleForTesting
+ long startedTime; // Time when entering state STARTED
+ @VisibleForTesting
+ long finishTime;
private float progress;
private final TezVertexID vertexId; //runtime assigned id.
@@ -576,14 +590,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Map<Vertex, Edge> targetVertices;
Set<Edge> uninitializedEdges = Sets.newHashSet();
- private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
rootInputDescriptors;
- private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
+ private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
additionalOutputs;
private Map<String, OutputCommitter> outputCommitters;
private Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
private static final InputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = InputSpecUpdate
- .getDefaultSinglePhysicalInputSpecUpdate();
+ .getDefaultSinglePhysicalInputSpecUpdate();
private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
private Set<String> inputsWithInitializers;
private int numInitializedInputs;
@@ -598,7 +612,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private RootInputInitializerManager rootInputInitializerManager;
VertexManager vertexManager;
-
+
private final UserGroupInformation dagUgi;
private boolean parallelismSet = false;
@@ -607,20 +621,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private AtomicBoolean committed = new AtomicBoolean(false);
private AtomicBoolean aborted = new AtomicBoolean(false);
private boolean commitVertexOutputs = false;
-
+
private Map<String, VertexGroupInfo> dagVertexGroups;
-
+
private TaskLocationHint taskLocationHints[];
private Map<String, LocalResource> localResources;
private Map<String, String> environment;
private final String javaOpts;
private final ContainerContext containerContext;
private VertexTerminationCause terminationCause;
-
+
private String logIdentifier;
- private boolean recoveryCommitInProgress = false;
+ @VisibleForTesting
+ boolean recoveryCommitInProgress = false;
private boolean summaryCompleteSeen = false;
- private boolean hasCommitter = false;
+ @VisibleForTesting
+ boolean hasCommitter = false;
private boolean vertexCompleteSeen = false;
private Map<String,EdgeManagerPluginDescriptor> recoveredSourceEdgeManagers = null;
private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null;
@@ -658,7 +674,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
logLocationHints(this.vertexName, vertexLocationHint);
}
setTaskLocationHints(vertexLocationHint);
-
+
this.dagUgi = appContext.getCurrentDAG().getDagUGI();
this.taskResource = DagTypeConverters
@@ -897,12 +913,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.readLock.unlock();
}
}
-
+
@Override
public TaskLocationHint getTaskLocationHint(TezTaskID taskId) {
this.readLock.lock();
try {
- if (taskLocationHints == null ||
+ if (taskLocationHints == null ||
taskLocationHints.length <= taskId.getId()) {
return null;
}
@@ -1081,8 +1097,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
private void setTaskLocationHints(VertexLocationHint vertexLocationHint) {
- if (vertexLocationHint != null &&
- vertexLocationHint.getTaskLocationHints() != null &&
+ if (vertexLocationHint != null &&
+ vertexLocationHint.getTaskLocationHints() != null &&
!vertexLocationHint.getTaskLocationHints().isEmpty()) {
List<TaskLocationHint> locHints = vertexLocationHint.getTaskLocationHints();
taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]);
@@ -1154,7 +1170,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
}
-
+
// Restore any rootInputSpecUpdates which may have been registered during a parallelism
// update.
if (rootInputSpecUpdates != null) {
@@ -1166,7 +1182,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
writeLock.unlock();
}
}
- Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: "
+ Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: "
+ parallelism + " for vertex: " + logIdentifier);
setVertexLocationHint(vertexLocationHint);
writeLock.lock();
@@ -1175,7 +1191,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Parallelism can only be set dynamically once per vertex: " + logIdentifier);
return false;
}
-
+
parallelismSet = true;
// Input initializer/Vertex Manager/1-1 split expected to set parallelism.
@@ -1185,7 +1201,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
"Vertex state is not Initializing. Value: " + getState()
+ " for vertex: " + logIdentifier);
}
-
+
if(sourceEdgeManagers != null) {
for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
LOG.info("Replacing edge manager for source:"
@@ -1223,7 +1239,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
stateChangeNotifier.stateChanged(vertexId,
new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
this.createTasks();
- LOG.info("Vertex " + getVertexId() +
+ LOG.info("Vertex " + getVertexId() +
" parallelism set to " + parallelism);
if (canInitVertex()) {
getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT));
@@ -1244,7 +1260,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return false;
}
if (parallelism == numTasks) {
- LOG.info("setParallelism same as current value: " + parallelism +
+ LOG.info("setParallelism same as current value: " + parallelism +
" for vertex: " + logIdentifier);
Preconditions.checkArgument(sourceEdgeManagers != null,
"Source edge managers or RootInputSpecs must be set when not changing parallelism");
@@ -1258,7 +1274,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
for (Edge edge : sourceVertices.values()) {
edge.startEventBuffering();
}
-
+
// assign to local variable of LinkedHashMap to make sure that changing
// type of task causes compile error. We depend on LinkedHashMap for order
LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
@@ -1281,14 +1297,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Removing task: " + entry.getKey());
iter.remove();
}
- LOG.info("Vertex " + logIdentifier +
+ LOG.info("Vertex " + logIdentifier +
" parallelism set to " + parallelism + " from " + numTasks);
int oldNumTasks = numTasks;
this.numTasks = parallelism;
stateChangeNotifier.stateChanged(vertexId,
new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
assert tasks.size() == numTasks;
-
+
// set new edge managers
if(sourceEdgeManagers != null) {
for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
@@ -1320,17 +1336,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
edge.stopEventBuffering();
}
}
-
+
for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
Edge edge = entry.getValue();
- if (edge.getEdgeProperty().getDataMovementType()
+ if (edge.getEdgeProperty().getDataMovementType()
== DataMovementType.ONE_TO_ONE) {
// inform these target vertices that we have changed parallelism
- VertexEventOneToOneSourceSplit event =
+ VertexEventOneToOneSourceSplit event =
new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
getVertexId(),
- ((originalOneToOneSplitSource!=null) ?
- originalOneToOneSplitSource : getVertexId()),
+ ((originalOneToOneSplitSource!=null) ?
+ originalOneToOneSplitSource : getVertexId()),
numTasks);
getEventHandler().handle(event);
}
@@ -1339,7 +1355,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} finally {
writeLock.unlock();
}
-
+
return true;
}
@@ -1733,7 +1749,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// no code, for now
}
- private ContainerContext getContainerContext(int taskIdx) {
+ @VisibleForTesting
+ ContainerContext getContainerContext(int taskIdx) {
if (taskSpecificLaunchCmdOpts.addTaskSpecificLaunchCmdOption(vertexName, taskIdx)) {
String jvmOpts = taskSpecificLaunchCmdOpts.getTaskSpecificOption(javaOpts, vertexName, taskIdx);
ContainerContext context = new ContainerContext(this.localResources,
@@ -1806,9 +1823,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (rootInputDescriptors != null) {
LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+ rootInputDescriptors);
- for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input
+ for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input
: rootInputDescriptors.values()) {
- if (input.getControllerDescriptor() != null &&
+ if (input.getControllerDescriptor() != null &&
input.getControllerDescriptor().getClassName() != null) {
if (inputsWithInitializers == null) {
inputsWithInitializers = Sets.newHashSet();
@@ -1875,7 +1892,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
checkTaskLimits();
return VertexState.INITED;
}
-
+
private void assignVertexManager() {
boolean hasBipartite = false;
boolean hasOneToOne = false;
@@ -1895,12 +1912,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
hasCustom = true;
break;
default:
- throw new TezUncheckedException("Unknown data movement type: " +
+ throw new TezUncheckedException("Unknown data movement type: " +
edge.getEdgeProperty().getDataMovementType());
}
}
}
-
+
boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
if (hasUserVertexManager) {
@@ -2008,6 +2025,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.finished(VertexState.ERROR);
}
+ // recover from recover log, should recover to running
+ // desiredState must be RUNNING based on above code
VertexState endState;
switch (vertex.recoveredState) {
case NEW:
@@ -2086,6 +2105,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
endState = VertexState.RUNNING;
} else {
+ // why succeeded here
endState = VertexState.SUCCEEDED;
vertex.finished(endState);
}
@@ -2249,7 +2269,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
(vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) ||
vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex)),
"Not connected to vertex " + otherVertex.getName() + " from vertex: " + vertex.logIdentifier);
- LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() +
+ LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() +
" at vertex : " + vertex.logIdentifier);
vertex.uninitializedEdges.remove(edge);
if(vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) {
@@ -2596,7 +2616,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (state.equals(VertexState.FAILED)) {
return state;
}
- // TODO move before to handle NEW state
+ // TODO move before to handle NEW state
if (vertex.targetVertices != null) {
for (Edge e : vertex.targetVertices.values()) {
if (e.getEdgeManager() == null) {
@@ -2619,7 +2639,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
}
-
+
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
@@ -2633,7 +2653,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.appContext.getTaskScheduler().getNumClusterNodes(),
vertex.getTaskResource(),
vertex.appContext.getTaskScheduler().getTotalResources());
- List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
for (String inputName : vertex.inputsWithInitializers) {
inputList.add(vertex.rootInputDescriptors.get(inputName));
@@ -2646,7 +2666,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} else {
boolean hasOneToOneUninitedSource = false;
for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
- if (entry.getValue().getEdgeProperty().getDataMovementType() ==
+ if (entry.getValue().getEdgeProperty().getDataMovementType() ==
DataMovementType.ONE_TO_ONE) {
if (entry.getKey().getTotalTasks() == -1) {
hasOneToOneUninitedSource = true;
@@ -2662,7 +2682,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}
- throw new TezUncheckedException(vertex.getVertexId() +
+ throw new TezUncheckedException(vertex.getVertexId() +
" has -1 tasks but does not have input initializers, " +
"1-1 uninited sources or custom vertex manager to set it at runtime");
}
@@ -2676,14 +2696,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.appContext.getTaskScheduler().getNumClusterNodes(),
vertex.getTaskResource(),
vertex.appContext.getTaskScheduler().getTotalResources());
- List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
for (String inputName : vertex.inputsWithInitializers) {
inputList.add(vertex.rootInputDescriptors.get(inputName));
}
LOG.info("Starting root input initializers: "
+ vertex.inputsWithInitializers.size());
- // special case when numTasks>0 and still we want to stay in initializing
+ // special case when numTasks>0 and still we want to stay in initializing
// state. This is handled in RootInputInitializedTransition specially.
vertex.initWaitsForRootInitializers = true;
vertex.rootInputInitializerManager.runInputInitializers(inputList);
@@ -2711,7 +2731,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Resource vertexTaskResource, Resource totalResource) {
return new RootInputInitializerManager(this, appContext, this.dagUgi, this.stateChangeNotifier);
}
-
+
private boolean initializeVertexInInitializingState() {
boolean isInitialized = initializeVertex();
if (!isInitialized) {
@@ -2721,7 +2741,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return true;
}
-
+
void startIfPossible() {
if (startSignalPending) {
// Trigger a start event to ensure route events are seen before
@@ -2735,7 +2755,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public static class VertexInitializedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
+
static VertexState doTransition(VertexImpl vertex) {
Preconditions.checkState(vertex.canInitVertex(), "Vertex: " + vertex.logIdentifier);
boolean isInitialized = vertex.initializeVertexInInitializingState();
@@ -2744,15 +2764,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
vertex.startIfPossible();
- return VertexState.INITED;
+ return VertexState.INITED;
}
-
+
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
return doTransition(vertex);
}
}
-
+
// present in most transitions so that the initializer thread can be shutdown properly
public static class RootInputInitializedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -2773,15 +2793,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// All inputs initialized, shutdown the initializer.
vertex.rootInputInitializerManager.shutdown();
}
-
+
// done. check if we need to do the initialization
- if (vertex.getState() == VertexState.INITIALIZING &&
+ if (vertex.getState() == VertexState.INITIALIZING &&
vertex.initWaitsForRootInitializers) {
// set the wait flag to false
vertex.initWaitsForRootInitializers = false;
// initialize vertex if possible and needed
if (vertex.canInitVertex()) {
- Preconditions.checkState(vertex.numTasks >= 0,
+ Preconditions.checkState(vertex.numTasks >= 0,
"Parallelism should have been set by now for vertex: " + vertex.logIdentifier);
return VertexInitializedTransition.doTransition(vertex);
}
@@ -2795,10 +2815,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
- VertexEventOneToOneSourceSplit splitEvent =
+ VertexEventOneToOneSourceSplit splitEvent =
(VertexEventOneToOneSourceSplit)event;
TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
-
+
if (vertex.originalOneToOneSplitSource != null) {
VertexState state = vertex.getState();
Preconditions
@@ -2813,25 +2833,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
// ignore another split event that may have come from a different
// path in the DAG. We have already split because of that source
- LOG.info("Ignoring split of vertex " + vertex.getVertexId() +
- " because of split in vertex " + originalSplitSource +
+ LOG.info("Ignoring split of vertex " + vertex.getVertexId() +
+ " because of split in vertex " + originalSplitSource +
" sent by vertex " + splitEvent.getSenderVertex() +
" numTasks " + splitEvent.getNumTasks());
return state;
}
// cannot split from multiple sources
- throw new TezUncheckedException("Vertex: " + vertex.getVertexId() +
- " asked to split by: " + originalSplitSource +
+ throw new TezUncheckedException("Vertex: " + vertex.getVertexId() +
+ " asked to split by: " + originalSplitSource +
" but was already split by:" + vertex.originalOneToOneSplitSource);
}
-
- LOG.info("Splitting vertex " + vertex.getVertexId() +
- " because of split in vertex " + originalSplitSource +
+
+ LOG.info("Splitting vertex " + vertex.getVertexId() +
+ " because of split in vertex " + originalSplitSource +
" sent by vertex " + splitEvent.getSenderVertex() +
" numTasks " + splitEvent.getNumTasks());
vertex.originalOneToOneSplitSource = originalSplitSource;
vertex.setParallelism(splitEvent.getNumTasks(), null, null, null);
- if (vertex.getState() == VertexState.RUNNING ||
+ if (vertex.getState() == VertexState.RUNNING ||
vertex.getState() == VertexState.INITED) {
return vertex.getState();
} else {
@@ -2861,19 +2881,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
vertex.numStartedSourceVertices++;
LOG.info("Source vertex started: " + startEvent.getSourceVertexId() +
- " for vertex: " + vertex.getVertexId() + " numStartedSources: " +
+ " for vertex: " + vertex.getVertexId() + " numStartedSources: " +
vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
-
+
if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) {
LOG.info("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: "
+ vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
return;
}
-
- // vertex meets external start dependency conditions. Save this signal in
+
+ // vertex meets external start dependency conditions. Save this signal in
// case we are not ready to start now and need to start later
vertex.startSignalPending = true;
-
+
if (vertex.getState() != VertexState.INITED) {
// vertex itself is not ready to start. External dependencies have already
// notified us.
@@ -2883,14 +2903,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ vertex.uninitializedEdges.size());
return;
}
-
+
// vertex is inited and all dependencies are ready. Inited vertex means
// parallelism must be set already and edges defined
Preconditions.checkState(
(vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty()),
"Cannot start vertex that is not completely defined. Vertex: "
+ vertex.logIdentifier + " numTasks: " + vertex.numTasks);
-
+
vertex.startIfPossible();
}
}
@@ -2906,14 +2926,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ " initWaitsForRootInitializers: " + initWaitsForRootInitializers);
return false;
}
-
- public static class StartWhileInitializingTransition implements
+
+ public static class StartWhileInitializingTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
// vertex state machine does not start itself in the initializing state
- // this start event can only come directly from the DAG. That means this
+ // this start event can only come directly from the DAG. That means this
// is a top level vertex of the dag
Preconditions.checkState(
(vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()),
@@ -2926,10 +2946,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public static class StartTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
+
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
- Preconditions.checkState(vertex.getState() == VertexState.INITED,
+ Preconditions.checkState(vertex.getState() == VertexState.INITED,
"Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier);
vertex.startTimeRequested = vertex.clock.getTime();
return vertex.startVertex();
@@ -2937,7 +2957,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
private VertexState startVertex() {
- Preconditions.checkState(getState() == VertexState.INITED,
+ Preconditions.checkState(getState() == VertexState.INITED,
"Vertex must be inited " + logIdentifier);
startedTime = clock.getTime();
@@ -3095,7 +3115,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break;
case ROOT_INPUT_INIT_FAILURE:
case COMMIT_FAILURE:
- case INVALID_NUM_OF_TASKS:
+ case INVALID_NUM_OF_TASKS:
case INIT_FAILURE:
case INTERNAL_ERROR:
case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
@@ -3172,7 +3192,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(vertex, task);
} else if (taskEvent.getState() == TaskState.FAILED) {
- LOG.info("Failing vertex: " + vertex.logIdentifier +
+ LOG.info("Failing vertex: " + vertex.logIdentifier +
" because task failed: " + taskEvent.getTaskID());
vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
forceTransitionToKillWait = true;
@@ -3220,7 +3240,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.succeededTaskCount--;
}
}
-
+
private static class VertexNoTasksCompletedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -3229,7 +3249,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return VertexImpl.checkVertexForCompletion(vertex);
}
}
-
+
private static class TaskCompletedAfterVertexSuccessTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@Override
@@ -3241,12 +3261,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vEvent.getState() == TaskState.FAILED) {
finalState = VertexState.FAILED;
finalStatus = VertexStatus.State.FAILED;
- diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() +
+ diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() +
" failed after vertex succeeded.";
} else {
finalState = VertexState.ERROR;
finalStatus = VertexStatus.State.ERROR;
- diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() +
+ diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() +
" completed with state " + vEvent.getState() + " after vertex succeeded.";
}
LOG.info(diagnosticMsg);
@@ -3599,14 +3619,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Nullable
@Override
- public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
getAdditionalInputs() {
return this.rootInputDescriptors;
}
@Nullable
@Override
- public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
+ public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
getAdditionalOutputs() {
return this.additionalOutputs;
}
@@ -3696,7 +3716,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
+ (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
if (rootInputDescriptors != null) {
- for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
rootInputDescriptorEntry : rootInputDescriptors.entrySet()) {
inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(),
rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get(
@@ -3729,18 +3749,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return outputSpecList;
}
-
+
//TODO Eventually remove synchronization.
@Override
public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
return groupInputSpecList;
}
-
+
@Override
public synchronized void addSharedOutputs(Set<String> outputs) {
this.sharedOutputs.addAll(outputs);
}
-
+
@Override
public synchronized Set<String> getSharedOutputs() {
return this.sharedOutputs;
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ec05815..aba4fd9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -111,7 +111,7 @@ public class TestDAGImpl {
private static final Log LOG = LogFactory.getLog(TestDAGImpl.class);
private DAGPlan dagPlan;
private TezDAGID dagId;
- private Configuration conf;
+ private static Configuration conf;
private DrainDispatcher dispatcher;
private Credentials fsTokens;
private AppContext appContext;
@@ -344,7 +344,7 @@ public class TestDAGImpl {
}
// Create a plan with 3 vertices: A, B, C. Group(A,B)->C
- private DAGPlan createGroupDAGPlan() {
+ static DAGPlan createGroupDAGPlan() {
LOG.info("Setting up group dag plan");
int dummyTaskCount = 1;
Resource dummyTaskResource = Resource.newInstance(1, 1);
@@ -381,7 +381,7 @@ public class TestDAGImpl {
return dag.createDag(conf);
}
- private DAGPlan createTestDAGPlan() {
+ public static DAGPlan createTestDAGPlan() {
LOG.info("Setting up dag plan");
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
new file mode 100644
index 0000000..da0186e
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestDAGRecovery {
+
+ private DAGImpl dag;
+ private EventHandler mockEventHandler;
+
+ private String user = "root";
+ private String dagName = "dag1";
+
+ private AppContext mockAppContext;
+ private ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ private long initTime = 100L;
+ private long startTime = initTime + 200L;
+ private long commitStartTime = startTime + 200L;
+ private long finishTime = commitStartTime + 200L;
+
+ @Before
+ public void setUp() {
+
+ mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ when(mockAppContext.getCurrentDAG().getDagUGI()).thenReturn(null);
+ mockEventHandler = mock(EventHandler.class);
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ dag =
+ new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler,
+ mock(TaskAttemptListener.class), new Credentials(),
+ new SystemClock(), user, mock(TaskHeartbeatHandler.class),
+ mockAppContext);
+ }
+
+ private void assertNewState() {
+ assertEquals(0, dag.getVertices().size());
+ assertEquals(0, dag.edges.size());
+ assertNull(dag.dagScheduler);
+ assertFalse(dag.recoveryCommitInProgress);
+ assertEquals(0, dag.recoveredGroupCommits.size());
+ }
+
+ private void restoreFromDAGInitializedEvent() {
+ DAGState recoveredState =
+ dag.restoreFromEvent(new DAGInitializedEvent(dagId, initTime, user,
+ dagName));
+ assertEquals(DAGState.INITED, recoveredState);
+ assertEquals(initTime, dag.initTime);
+ assertEquals(6, dag.getVertices().size());
+ assertEquals(6, dag.edges.size());
+ assertNotNull(dag.dagScheduler);
+ }
+
+ private void restoreFromDAGStartedEvent() {
+ DAGState recoveredState =
+ dag.restoreFromEvent(new DAGStartedEvent(dagId, startTime, user,
+ dagName));
+ assertEquals(startTime, dag.startTime);
+ assertEquals(DAGState.RUNNING, recoveredState);
+ }
+
+ private void restoreFromDAGCommitStartedEvent() {
+ DAGState recoveredState =
+ dag.restoreFromEvent(new DAGCommitStartedEvent(dagId, commitStartTime));
+ assertTrue(dag.recoveryCommitInProgress);
+ assertEquals(DAGState.RUNNING, recoveredState);
+ }
+
+ private void restoreFromVertexGroupCommitStartedEvent() {
+ DAGState recoveredState =
+ dag.restoreFromEvent(new VertexGroupCommitStartedEvent(dagId, "g1",
+ commitStartTime));
+ assertEquals(1, dag.recoveredGroupCommits.size());
+ assertFalse(dag.recoveredGroupCommits.get("g1").booleanValue());
+ assertEquals(DAGState.RUNNING, recoveredState);
+ }
+
+ private void restoreFromVertexGroupCommitFinishedEvent() {
+ DAGState recoveredState =
+ dag.restoreFromEvent(new VertexGroupCommitFinishedEvent(dagId, "g1",
+ commitStartTime + 100L));
+ assertEquals(1, dag.recoveredGroupCommits.size());
+ assertTrue(dag.recoveredGroupCommits.get("g1").booleanValue());
+ assertEquals(DAGState.RUNNING, recoveredState);
+ }
+
+ private void restoreFromDAGFinishedEvent(DAGState finalState) {
+ DAGState recoveredState =
+ dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime,
+ finalState, "", new TezCounters(), user, dagName));
+ assertEquals(finishTime, dag.finishTime);
+ assertFalse(dag.recoveryCommitInProgress);
+ assertEquals(finalState, recoveredState);
+ }
+
+ /**
+ * New -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_FromNew() {
+ assertNewState();
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+ ArgumentCaptor<DAGEvent> eventCaptor =
+ ArgumentCaptor.forClass(DAGEvent.class);
+ verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+ List<DAGEvent> events = eventCaptor.getAllValues();
+ assertEquals(2, events.size());
+ assertEquals(DAGEventType.DAG_INIT, events.get(0).getType());
+ assertEquals(DAGEventType.DAG_START, events.get(1).getType());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_FromInited() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+ assertEquals(DAGState.RUNNING, dag.getState());
+ // send recover event to 2 root vertex
+ ArgumentCaptor<VertexEvent> eventCaptor =
+ ArgumentCaptor.forClass(VertexEvent.class);
+ verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+ List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
+ assertEquals(2, vertexEvents.size());
+ for (VertexEvent vEvent : vertexEvents) {
+ assertTrue(vEvent instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
+ assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+ }
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_FromStarted() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+ assertEquals(DAGState.RUNNING, dag.getState());
+ // send recover event to 2 root vertex
+ ArgumentCaptor<VertexEvent> eventCaptor =
+ ArgumentCaptor.forClass(VertexEvent.class);
+ verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+ List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
+ assertEquals(2, vertexEvents.size());
+ for (VertexEvent vEvent : vertexEvents) {
+ assertTrue(vEvent instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
+ assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+ }
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * restoreFromDAGFinishedEvent (SUCCEEDED) -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_Finished_SUCCEEDED() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+ restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.SUCCEEDED, dag.getState());
+
+ // recover all the vertices to SUCCEED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * restoreFromDAGFinishedEvent(FAILED) -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_Finished_FAILED() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+ restoreFromDAGFinishedEvent(DAGState.FAILED);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.FAILED, dag.getState());
+
+ // recover all the vertices to FAILED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
+ * restoreFromDAGFinishedEvent -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_Finished_KILLED() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+ restoreFromDAGFinishedEvent(DAGState.KILLED);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.KILLED, dag.getState());
+
+ // recover all the vertices to KILLED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
+ * restoreFromDAGFinishedEvent -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_Finished_ERROR() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+ restoreFromDAGFinishedEvent(DAGState.ERROR);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.ERROR, dag.getState());
+
+ // recover all the vertices to KILLED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.ERROR, dagFinishedEvent.getDAGState());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * restoreFromDAG_COMMIT_STARTED -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_COMMIT_STARTED() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+ restoreFromDAGCommitStartedEvent();
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.FAILED, dag.getState());
+
+ // recover all the vertices to SUCCEEDED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * restoreFromDAG_COMMIT_STARTED -> -> restoreFromDAGFinished (SUCCEEDED)->
+ * RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_COMMIT_STARTED_Finished_SUCCEEDED() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+
+ restoreFromDAGCommitStartedEvent();
+ restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.SUCCEEDED, dag.getState());
+
+ // recover all the vertices to SUCCEED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * restoreFromVERTEX_GROUP_COMMIT_STARTED -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_GROUP_COMMIT_STARTED() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+ restoreFromVertexGroupCommitStartedEvent();
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.FAILED, dag.getState());
+
+ // recover all the vertices to SUCCEEDED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
+ * RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_GROUP_COMMIT_STARTED_FINISHED() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+
+ restoreFromVertexGroupCommitStartedEvent();
+ restoreFromVertexGroupCommitFinishedEvent();
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ assertEquals(DAGState.RUNNING, dag.getState());
+
+ // send recover event to 2 root vertex
+ verify(mockEventHandler, times(2)).handle(
+ any(VertexEventRecoverVertex.class));
+ assertEquals(DAGState.RUNNING, dag.getState());
+ }
+
+ /**
+ * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+ * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
+ * restoreFromDAG_Finished -> RecoverTransition
+ */
+ @Test
+ public void testDAGRecovery_GROUP_COMMIT_Finished() {
+ assertNewState();
+ restoreFromDAGInitializedEvent();
+ restoreFromDAGStartedEvent();
+
+ restoreFromVertexGroupCommitStartedEvent();
+ restoreFromVertexGroupCommitFinishedEvent();
+ restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+ // recover all the vertices to SUCCEEDED
+ ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+ List<Event> events = eventCaptor.getAllValues();
+ int i = 0;
+ for (; i < 6; ++i) {
+ assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+ VertexEventRecoverVertex recoverEvent =
+ (VertexEventRecoverVertex) events.get(i);
+ assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+ }
+
+ // send DAGAppMasterEventDAGFinished at last
+ assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+ DAGAppMasterEventDAGFinished dagFinishedEvent =
+ (DAGAppMasterEventDAGFinished) events.get(i);
+ assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
new file mode 100644
index 0000000..3b04cf6
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestTaskAttemptRecovery {
+
+ private TaskAttemptImpl ta;
+ private EventHandler mockEventHandler;
+ private long startTime = System.currentTimeMillis();
+ private long finishTime = startTime + 5000;
+
+ private TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
+ private String vertexName = "v1";
+
+ @Before
+ public void setUp() {
+ mockEventHandler = mock(EventHandler.class);
+ TezTaskID taskId =
+ TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
+ ta =
+ new TaskAttemptImpl(taskId, 0, mockEventHandler,
+ mock(TaskAttemptListener.class), new Configuration(),
+ new SystemClock(), mock(TaskHeartbeatHandler.class),
+ mock(AppContext.class), false, Resource.newInstance(1, 1),
+ mock(ContainerContext.class), false);
+ }
+
+ private void restoreFromTAStartEvent() {
+ TaskAttemptState recoveredState =
+ ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
+ startTime, mock(ContainerId.class), mock(NodeId.class), "", ""));
+ assertEquals(startTime, ta.getLaunchTime());
+ assertEquals(TaskAttemptState.RUNNING, recoveredState);
+ }
+
+ private void restoreFromTAFinishedEvent(TaskAttemptState state) {
+ String diag = "test_diag";
+ TezCounters counters = mock(TezCounters.class);
+
+ TaskAttemptState recoveredState =
+ ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+ startTime, finishTime, state, diag, counters));
+ assertEquals(startTime, ta.getLaunchTime());
+ assertEquals(finishTime, ta.getFinishTime());
+ assertEquals(counters, ta.reportedStatus.counters);
+ assertEquals(1.0f, ta.reportedStatus.progress, 1e-6);
+ assertEquals(state, ta.reportedStatus.state);
+ assertEquals(1, ta.getDiagnostics().size());
+ assertEquals(diag, ta.getDiagnostics().get(0));
+ assertEquals(state, recoveredState);
+ }
+
+ /**
+ * No any event to restore -> RecoverTransition
+ */
+ @Test
+ public void testTARecovery_NEW() {
+ ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+ assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
+ verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+ }
+
+ /**
+ * restoreFromTAStartEvent -> RecoverTransition
+ */
+ @Test
+ public void testTARecovery_START() {
+ restoreFromTAStartEvent();
+
+ ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+ assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
+ verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+ }
+
+ /**
+ * restoreFromTAStartEvent -> restoreFromTAFinished (SUCCEED)
+ * -> RecoverTransition
+ */
+ @Test
+ public void testTARecovery_SUCCEED() {
+ restoreFromTAStartEvent();
+ restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
+
+ ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+ assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState());
+ verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+ }
+
+ /**
+ * restoreFromTAStartEvent -> restoreFromTAFinished (KILLED)
+ * -> RecoverTransition
+ */
+ @Test
+ public void testTARecovery_KIILED() {
+ restoreFromTAStartEvent();
+ restoreFromTAFinishedEvent(TaskAttemptState.KILLED);
+
+ ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+ assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
+ verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+ }
+
+ /**
+ * restoreFromTAStartEvent -> restoreFromTAFinished (FAILED)
+ * -> RecoverTransition
+ */
+ @Test
+ public void testTARecovery_FAILED() {
+ restoreFromTAStartEvent();
+ restoreFromTAFinishedEvent(TaskAttemptState.FAILED);
+
+ ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+ assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState());
+ verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+ }
+
+ /**
+ * restoreFromTAFinishedEvent ( no TAStartEvent before TAFinishedEvent )
+ */
+ @Test
+ public void testRecover_FINISH_BUT_NO_START() {
+ try {
+ restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
+ fail("Should fail due to no TaskAttemptStartEvent before TaskAttemptFinishedEvent");
+ } catch (Throwable e) {
+ assertEquals("Finished Event seen but"
+ + " no Started Event was encountered earlier", e.getMessage());
+ }
+ }
+}