You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/05/20 23:50:26 UTC
git commit: TEZ-111. Create tests for DAGImpl. (hitesh)
Updated Branches:
refs/heads/TEZ-1 be6d4bc09 -> 8298190dd
TEZ-111. Create tests for DAGImpl. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8298190d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8298190d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8298190d
Branch: refs/heads/TEZ-1
Commit: 8298190ddb513652ea65f72a5104f6b3b3320c86
Parents: be6d4bc
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 20 14:49:43 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 20 14:49:43 2013 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/event/DAGEventType.java | 17 +-
.../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 45 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 76 +-
.../apache/tez/dag/app/dag/impl/TestDAGImpl.java | 626 +++++++++++++++
.../tez/dag/app/dag/impl/TestVertexImpl.java | 167 +++-
5 files changed, 814 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 20f14b2..14c2f30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -30,24 +30,17 @@ public enum DAGEventType {
DAG_INIT,
DAG_START,
- //Producer:Task
- /*
- JOB_TASK_COMPLETED,
- JOB_MAP_TASK_RESCHEDULED,
- JOB_TASK_ATTEMPT_COMPLETED,
- */
-
//Producer: Vertex
- DAG_VERTEX_INITED,
- DAG_VERTEX_STARTED,
DAG_VERTEX_COMPLETED,
+
+ //Producer: TaskImpl
DAG_SCHEDULER_UPDATE,
-
- //Producer:Job
+
+ //Producer:Dag
DAG_COMPLETED,
//Producer:Any component
DAG_DIAGNOSTIC_UPDATE,
INTERNAL_ERROR,
- DAG_COUNTER_UPDATE,
+ DAG_COUNTER_UPDATE,
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 66f9c36..5f47818 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -117,7 +117,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final TaskAttemptListener taskAttemptListener;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private final Object tasksSyncHandle = new Object();
-
+
private DAGScheduler dagScheduler;
private final EventHandler eventHandler;
@@ -129,7 +129,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final AppContext appContext;
volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
- private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>();
+ private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>();
private TezCounters dagCounters = new TezCounters();
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
@@ -150,7 +150,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
new CounterUpdateTransition();
- private static final DAGSchedulerUpdateTransition
+ private static final DAGSchedulerUpdateTransition
DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
protected static final
@@ -211,8 +211,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
- .addTransition(DAGState.RUNNING, DAGState.RUNNING,
- DAGEventType.DAG_SCHEDULER_UPDATE,
+ .addTransition(DAGState.RUNNING, DAGState.RUNNING,
+ DAGEventType.DAG_SCHEDULER_UPDATE,
DAG_SCHEDULER_UPDATE_TRANSITION)
.addTransition(
DAGState.RUNNING,
@@ -332,7 +332,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
this.jobPlan = jobPlan;
this.conf = conf;
this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
-
+
this.userName = appUserName;
// TODO Metrics
//this.metrics = metrics;
@@ -374,7 +374,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public TezConfiguration getConf() {
return conf;
}
-
+
@Override
public DAGPlan getJobPlan() {
return jobPlan;
@@ -508,7 +508,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
readLock.unlock();
}
}
-
+
// monitoring apis
@Override
public DAGStatusBuilder getDAGStatus() {
@@ -552,11 +552,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
return vertex.getVertexStatus();
}
-
-
+
+
protected void startRootVertices() {
for (Vertex v : vertices.values()) {
if (v.getInputVerticesCount() == 0) {
+ LOG.info("DEBUG: Starting root vertex " + v.getName());
eventHandler.handle(new VertexEvent(v.getVertexId(),
VertexEventType.V_START));
}
@@ -605,7 +606,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
writeLock.unlock();
}
}
-
+
@Private
public DAGState getInternalState() {
readLock.lock();
@@ -727,7 +728,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public String getUserName() {
return userName;
}
-
+
@Override
public String getQueueName() {
return queueName;
@@ -862,7 +863,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dag.getJobPlan().getEdgeList());
Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
-
+
// setup the dag
for (Vertex v : dag.vertices.values()) {
parseVertexEdges(dag, edgePlans, v);
@@ -888,10 +889,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
-
+
VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
VertexLocationHint vertexLocationHint = DagTypeConverters.convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
-
+
return new VertexImpl(
vertexId, vertexPlan, vertexName, dag.conf,
dag.eventHandler, dag.taskAttemptListener,
@@ -912,18 +913,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
for(String inEdgeId : vertexPlan.getInEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(inEdgeId);
- Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
+ Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
EdgeProperty edgeProp = dag.edges.get(inEdgeId);
inVertices.put(inVertex, edgeProp);
}
-
+
for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(outEdgeId);
- Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
+ Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
EdgeProperty edgeProp = dag.edges.get(outEdgeId);
outVertices.put(outVertex, edgeProp);
}
-
+
vertex.setInputVertices(inVertices);
vertex.setOutputVertices(outVertices);
}
@@ -931,9 +932,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
protected void setup(DAGImpl job) throws IOException {
job.initTime = job.clock.getTime();
String dagIdString = job.dagId.toString();
-
+
dagIdString.replace("application", "job");
-
+
// TODO remove - TEZ-71
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
@@ -1165,7 +1166,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
}
-
+
private static class DAGSchedulerUpdateTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 20d13cd..ce4c609 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -141,7 +141,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// TODO Metrics
//private final MRAppMetrics metrics;
private final AppContext appContext;
-
+
private boolean lazyTasksCopyNeeded = false;
volatile Map<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
private Object fullCountersLock = new Object();
@@ -154,7 +154,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private int numStartedSourceVertices = 0;
private int distanceFromRoot = 0;
-
+
private List<TezDependentTaskCompletionEvent> sourceTaskAttemptCompletionEvents;
private final List<String> diagnostics = new ArrayList<String>();
@@ -167,7 +167,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
List<InputSpec> inputSpecList;
List<OutputSpec> outputSpecList;
-
+
private static final InternalErrorTransition
INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final TaskAttemptCompletedEventTransition
@@ -200,7 +200,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.INITED, VertexState.INITED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
new SourceVertexStartedTransition())
- .addTransition(VertexState.INITED, VertexState.RUNNING,
+ .addTransition(VertexState.INITED, VertexState.RUNNING,
VertexEventType.V_START,
new StartTransition())
@@ -220,7 +220,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
(VertexState.RUNNING,
- EnumSet.of(VertexState.RUNNING, VertexState.KILLED,
+ EnumSet.of(VertexState.RUNNING,
VertexState.SUCCEEDED, VertexState.FAILED),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedTransition())
@@ -335,14 +335,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Token<JobTokenIdentifier> jobToken;
private final TezVertexID vertexId; //runtime assigned id.
- private final VertexPlan vertexPlan;
+ private final VertexPlan vertexPlan;
private final String vertexName;
private final String processorName;
private Map<Vertex, EdgeProperty> sourceVertices;
private Map<Vertex, EdgeProperty> targetVertices;
-
+
private VertexScheduler vertexScheduler;
private VertexOutputCommitter committer;
@@ -351,8 +351,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Map<String, LocalResource> localResources;
private Map<String, String> environment;
private final String javaOpts;
-
- public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
+
+ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, TezConfiguration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
@@ -384,13 +384,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.jobToken = jobToken;
this.committer = new NullVertexOutputCommitter();
this.vertexLocationHint = vertexLocationHint;
-
+
this.taskResource = DagTypeConverters.CreateResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
- this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null;
+ this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null;
this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig().getLocalResourceList());
this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig().getEnvironmentSettingList());
this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
-
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
@@ -404,12 +404,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public TezVertexID getVertexId() {
return vertexId;
}
-
+
@Override
public VertexPlan getVertexPlan() {
return vertexPlan;
}
-
+
@Override
public int getDistanceFromRoot() {
return distanceFromRoot;
@@ -521,7 +521,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.readLock.unlock();
}
}
-
+
@Override
public ProgressBuilder getVertexProgress() {
this.readLock.lock();
@@ -537,7 +537,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.readLock.unlock();
}
}
-
+
@Override
public VertexStatusBuilder getVertexStatus() {
this.readLock.lock();
@@ -631,7 +631,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
writeLock.unlock();
}
}
-
+
private VertexState getInternalState() {
readLock.lock();
try {
@@ -710,7 +710,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.abortVertex(VertexStatus.State.FAILED);
return vertex.finished(VertexState.FAILED);
}
-
+
if(vertex.succeededTaskCount == vertex.tasks.size()) {
try {
if (!vertex.committed.getAndSet(true)) {
@@ -721,9 +721,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
return vertex.finished(VertexState.FAILED);
}
- return vertex.finished(VertexState.SUCCEEDED);
+ return vertex.finished(VertexState.SUCCEEDED);
}
-
+
if (vertex.completedTaskCount == vertex.tasks.size()) {
// this means the vertex has some killed tasks
assert vertex.killedTaskCount > 0;
@@ -731,9 +731,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.abortVertex(VertexStatus.State.KILLED);
return vertex.finished(VertexState.KILLED);
}
-
+
//return the current state, Vertex not finished yet
- return vertex.getInternalState();
+ return vertex.getInternalState();
}
VertexState finished(VertexState finalState) {
@@ -774,9 +774,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// TODODAGAM
// TODO: Splits?
-
+
vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
-
+
/*
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
@@ -794,8 +794,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// create the Tasks but don't start them yet
createTasks(vertex);
-
-
+
+
boolean hasBipartite = false;
if (vertex.sourceVertices != null) {
@@ -806,10 +806,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
}
-
+
if (hasBipartite) {
// setup vertex scheduler
- // TODO this needs to consider data size and perhaps API.
+ // TODO this needs to consider data size and perhaps API.
// Currently implicitly BIPARTITE is the only edge type
vertex.vertexScheduler = new BipartiteSlowStartVertexScheduler(
vertex,
@@ -907,7 +907,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
- VertexEventSourceVertexStarted startEvent =
+ VertexEventSourceVertexStarted startEvent =
(VertexEventSourceVertexStarted) event;
int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
if(vertex.distanceFromRoot < distanceFromRoot) {
@@ -916,8 +916,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.numStartedSourceVertices++;
if (vertex.numStartedSourceVertices == vertex.sourceVertices.size()) {
// Consider inlining this.
- LOG.info("Starting vertex: " + vertex.getVertexId() +
- " with name: " + vertex.getName() +
+ LOG.info("Starting vertex: " + vertex.getVertexId() +
+ " with name: " + vertex.getName() +
" with distanceFromRoot: " + vertex.distanceFromRoot );
vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
VertexEventType.V_START));
@@ -1040,10 +1040,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
//eventId is equal to index in the arraylist
tce.setEventId(vertex.sourceTaskAttemptCompletionEvents.size());
vertex.sourceTaskAttemptCompletionEvents.add(tce);
- // TODO this needs to be ordered/grouped by source vertices or else
- // my tasks will not know which events are for which vertices' tasks. This
+ // TODO this needs to be ordered/grouped by source vertices or else
+ // my tasks will not know which events are for which vertices' tasks. This
// differentiation was not needed for MR because there was only 1 M stage.
- // if the tce is sent to the task then a solution could be to add vertex
+ // if the tce is sent to the task then a solution could be to add vertex
// name to the tce
// need to send vertex name and task index in that vertex
@@ -1060,7 +1060,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
vertex.successSourceAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
}
-
+
vertex.vertexScheduler.onSourceTaskCompleted(attemptId);
}
}
@@ -1073,7 +1073,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
TezDependentTaskCompletionEvent tce =
((VertexEventTaskAttemptCompleted) event).getCompletionEvent();
- // TODO this should only be sent for successful events? looks like all
+ // TODO this should only be sent for successful events? looks like all
// need to be sent in the existing shuffle code
// Notify all target vertices
if (vertex.targetVertices != null) {
@@ -1245,7 +1245,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public TezDAGID getDAGId() {
return getDAG().getID();
}
-
+
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return appContext.getApplicationAttemptId();
@@ -1259,7 +1259,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public DAG getDAG() {
return appContext.getDAG();
}
-
+
@VisibleForTesting
String getProcessorName() {
return this.processorName;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
new file mode 100644
index 0000000..3adcd8b
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeConnectionPattern;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestDAGImpl {
+
+ private static final Log LOG = LogFactory.getLog(TestDAGImpl.class);
+ private DAGPlan dagPlan;
+ private TezDAGID dagId;
+ private TezConfiguration conf;
+ private DrainDispatcher dispatcher;
+ private Credentials fsTokens;
+ private AppContext appContext;
+ private ApplicationAttemptId appAttemptId;
+ private DAGImpl dag;
+ private VertexEventDispatcher vertexEventDispatcher;
+ private DagEventDispatcher dagEventDispatcher;
+ private TaskAttemptListener taskAttemptListener;
+ private TaskHeartbeatHandler thh;
+ private Clock clock = new SystemClock();
+ private JobTokenSecretManager jobTokenSecretManager;
+ private DAGFinishEventHandler dagFinishEventHandler;
+
+ private class DagEventDispatcher implements EventHandler<DAGEvent> {
+ @Override
+ public void handle(DAGEvent event) {
+ dag.handle(event);
+ }
+ }
+
+ private class HistoryHandler implements EventHandler<DAGHistoryEvent> {
+ @Override
+ public void handle(DAGHistoryEvent event) {
+ }
+ }
+
+ private class TaskEventHandler implements EventHandler<TaskEvent> {
+ @Override
+ public void handle(TaskEvent event) {
+ }
+ }
+
+ private class VertexEventDispatcher
+ implements EventHandler<VertexEvent> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(VertexEvent event) {
+ Vertex vertex = dag.getVertex(event.getVertexId());
+ ((EventHandler<VertexEvent>) vertex).handle(event);
+ }
+ }
+
+ private class DAGFinishEventHandler
+ implements EventHandler<DAGFinishEvent> {
+ public int dagFinishEvents = 0;
+
+ @Override
+ public void handle(DAGFinishEvent event) {
+ ++dagFinishEvents;
+ }
+ }
+
+ private DAGPlan createTestDAGPlan() {
+ LOG.info("Setting up dag plan");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("testverteximpl")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host1")
+ .addRack("rack1")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host2")
+ .addRack("rack2")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addOutEdgeId("e2")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorName("x3.y3")
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host3")
+ .addRack("rack3")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("foo")
+ .setTaskModule("x3.y3")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addInEdgeId("e2")
+ .addOutEdgeId("e3")
+ .addOutEdgeId("e4")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex4")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host4")
+ .addRack("rack4")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x4.y4")
+ .build()
+ )
+ .addInEdgeId("e3")
+ .addOutEdgeId("e5")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex5")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host5")
+ .addRack("rack5")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x5.y5")
+ .build()
+ )
+ .addInEdgeId("e4")
+ .addOutEdgeId("e6")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex6")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host6")
+ .addRack("rack6")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x6.y6")
+ .build()
+ )
+ .addInEdgeId("e5")
+ .addInEdgeId("e6")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setInputClass("i3_v1")
+ .setInputVertexName("vertex1")
+ .setOutputClass("o1")
+ .setOutputVertexName("vertex3")
+ .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+ .setId("e1")
+ .setSourceType(PlanEdgeSourceType.STABLE)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setInputClass("i3_v2")
+ .setInputVertexName("vertex2")
+ .setOutputClass("o2")
+ .setOutputVertexName("vertex3")
+ .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+ .setId("e2")
+ .setSourceType(PlanEdgeSourceType.STABLE)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setInputClass("i4_v3")
+ .setInputVertexName("vertex3")
+ .setOutputClass("o3_v4")
+ .setOutputVertexName("vertex4")
+ .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+ .setId("e3")
+ .setSourceType(PlanEdgeSourceType.STABLE)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setInputClass("i5_v3")
+ .setInputVertexName("vertex3")
+ .setOutputClass("o3_v5")
+ .setOutputVertexName("vertex5")
+ .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+ .setId("e4")
+ .setSourceType(PlanEdgeSourceType.STABLE)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setInputClass("i6_v4")
+ .setInputVertexName("vertex4")
+ .setOutputClass("o4")
+ .setOutputVertexName("vertex6")
+ .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+ .setId("e5")
+ .setSourceType(PlanEdgeSourceType.STABLE)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setInputClass("i6_v5")
+ .setInputVertexName("vertex5")
+ .setOutputClass("o5")
+ .setOutputVertexName("vertex6")
+ .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+ .setId("e6")
+ .setSourceType(PlanEdgeSourceType.STABLE)
+ .build()
+ )
+ .build();
+
+ return dag;
+ }
+
+ @Before
+ public void setup() {
+ conf = new TezConfiguration();
+ appAttemptId = BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(100, 1), 1);
+ dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
+ Assert.assertNotNull(dagId);
+ dagPlan = createTestDAGPlan();
+ dispatcher = new DrainDispatcher();
+ fsTokens = new Credentials();
+ jobTokenSecretManager = new JobTokenSecretManager();
+ appContext = mock(AppContext.class);
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+ doReturn(dagId).when(appContext).getDAGID();
+ dag = new DAGImpl(dagId, appAttemptId, conf, dagPlan,
+ dispatcher.getEventHandler(), taskAttemptListener,
+ jobTokenSecretManager, fsTokens, clock, "user", 10000, thh, appContext);
+ doReturn(dag).when(appContext).getDAG();
+ vertexEventDispatcher = new VertexEventDispatcher();
+ dispatcher.register(VertexEventType.class, vertexEventDispatcher);
+ dagEventDispatcher = new DagEventDispatcher();
+ dispatcher.register(DAGEventType.class, dagEventDispatcher);
+ dispatcher.register(HistoryEventType.class,
+ new HistoryHandler());
+ dagFinishEventHandler = new DAGFinishEventHandler();
+ dispatcher.register(DAGFinishEvent.Type.class, dagFinishEventHandler);
+ dispatcher.register(TaskEventType.class, new TaskEventHandler());
+ dispatcher.init(conf);
+ dispatcher.start();
+ }
+
+ @After
+ public void teardown() {
+ dagPlan = null;
+ dag = null;
+ dispatcher.await();
+ dispatcher.stop();
+ }
+
+ private void initDAG(DAGImpl dag) {
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+ Assert.assertEquals(DAGState.INITED, dag.getState());
+ }
+
+ private void startDAG(DAGImpl dag) {
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START));
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+ }
+
+ @Test
+ public void testDAGInit() {
+ initDAG(dag);
+ Assert.assertEquals(6, dag.getTotalVertices());
+ }
+
+ @Test
+ public void testDAGStart() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ for (int i = 0 ; i < 6; ++i ) {
+ TezVertexID vId = new TezVertexID(dagId, i);
+ Vertex v = dag.getVertex(vId);
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ if (i < 2) {
+ Assert.assertEquals(0, v.getDistanceFromRoot());
+ } else if (i == 2) {
+ Assert.assertEquals(1, v.getDistanceFromRoot());
+ } else if ( i > 2 && i < 5) {
+ Assert.assertEquals(2, v.getDistanceFromRoot());
+ } else if (i == 5) {
+ Assert.assertEquals(3, v.getDistanceFromRoot());
+ }
+ }
+
+ for (int i = 0 ; i < 6; ++i ) {
+ TezVertexID vId = new TezVertexID(dagId, i);
+ LOG.info("Distance from root: v" + i + ":"
+ + dag.getVertex(vId).getDistanceFromRoot());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testVertexCompletion() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ TezVertexID vId = new TezVertexID(dagId, 1);
+ Vertex v = dag.getVertex(vId);
+ ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted(
+ new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+ ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted(
+ new TezTaskID(vId, 1), TaskState.SUCCEEDED));
+ dispatcher.await();
+
+ Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+ Assert.assertEquals(1, dag.getSuccessfulVertices());
+ }
+
+ public void testKillStartedDAG() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.await();
+
+ Assert.assertEquals(DAGState.KILLED, dag.getState());
+ for (int i = 0 ; i < 6; ++i ) {
+ TezVertexID vId = new TezVertexID(dagId, i);
+ Vertex v = dag.getVertex(vId);
+ Assert.assertEquals(VertexState.KILLED, v.getState());
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testKillRunningDAG() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ TezVertexID vId1 = new TezVertexID(dagId, 1);
+ Vertex v1 = dag.getVertex(vId1);
+ ((EventHandler<VertexEvent>) v1).handle(new VertexEventTaskCompleted(
+ new TezTaskID(vId1, 0), TaskState.SUCCEEDED));
+ TezVertexID vId0 = new TezVertexID(dagId, 0);
+ Vertex v0 = dag.getVertex(vId0);
+ ((EventHandler<VertexEvent>) v0).handle(new VertexEventTaskCompleted(
+ new TezTaskID(vId0, 0), TaskState.SUCCEEDED));
+ dispatcher.await();
+
+ Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.await();
+
+ Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+ Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
+ Assert.assertEquals(VertexState.KILL_WAIT, v1.getState());
+ for (int i = 2 ; i < 6; ++i ) {
+ TezVertexID vId = new TezVertexID(dagId, i);
+ Vertex v = dag.getVertex(vId);
+ Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ }
+ Assert.assertEquals(1, dag.getSuccessfulVertices());
+ }
+
+ @Test
+ public void testInvalidEvent() {
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.ERROR, dag.getState());
+ }
+
+ @Test
+ @Ignore
+ public void testVertexSuccessfulCompletionUpdates() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ for (int i = 0; i < 6; ++i) {
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ }
+ dispatcher.await();
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+ Assert.assertEquals(1, dag.getSuccessfulVertices());
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 2), VertexState.SUCCEEDED));
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 3), VertexState.SUCCEEDED));
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 4), VertexState.SUCCEEDED));
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 5), VertexState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.SUCCEEDED, dag.getState());
+ Assert.assertEquals(6, dag.getSuccessfulVertices());
+ }
+
+ @Test
+ @Ignore
+ public void testVertexFailureHandling() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 2), VertexState.FAILED));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.FAILED, dag.getState());
+ Assert.assertEquals(2, dag.getSuccessfulVertices());
+
+ // Expect running vertices to be killed on first failure
+ for (int i = 3; i < 6; ++i) {
+ TezVertexID vId = new TezVertexID(dagId, i);
+ Vertex v = dag.getVertex(vId);
+ Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ }
+ }
+
+ @Test
+ @Ignore
+ public void testDAGKill() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+
+ for (int i = 2; i < 6; ++i) {
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+ }
+ dispatcher.await();
+ Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(6, dag.getSuccessfulVertices());
+ Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
+ }
+
+ @Test
+ public void testDAGKillPending() {
+ initDAG(dag);
+ startDAG(dag);
+ dispatcher.await();
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+
+ for (int i = 2; i < 5; ++i) {
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+ }
+ dispatcher.await();
+ Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+
+ dag.handle(new DAGEventVertexCompleted(
+ new TezVertexID(dagId, 5), VertexState.KILLED));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(5, dag.getSuccessfulVertices());
+ Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
+ }
+
+ @Test
+ public void testDiagnosticUpdates() {
+ // FIXME need to implement
+ }
+
+ @Test
+ public void testCounterUpdates() {
+ // FIXME need to implement
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index fe6ae14..f78a7b8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -62,6 +62,8 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
@@ -115,7 +117,7 @@ public class TestVertexImpl {
public int abortCounter = 0;
private boolean throwError;
private boolean throwErrorOnAbort;
-
+
public CountingVertexOutputCommitter(boolean throwError,
boolean throwOnAbort) {
this.throwError = throwError;
@@ -125,7 +127,7 @@ public class TestVertexImpl {
public CountingVertexOutputCommitter() {
this(false, false);
}
-
+
@Override
public void init(VertexContext context) throws IOException {
++initCounter;
@@ -150,12 +152,26 @@ public class TestVertexImpl {
if (throwErrorOnAbort) {
throw new IOException("I can throwz exceptions in abort");
}
- }
+ }
}
-
+
+ private class TaskEventHandler implements EventHandler<TaskEvent> {
+ @Override
+ public void handle(TaskEvent event) {
+ }
+ }
+
private class DagEventDispatcher implements EventHandler<DAGEvent> {
+ public Map<DAGEventType, Integer> eventCount =
+ new HashMap<DAGEventType, Integer>();
+
@Override
public void handle(DAGEvent event) {
+ int count = 1;
+ if (eventCount.containsKey(event.getType())) {
+ count = eventCount.get(event.getType()) + 1;
+ }
+ eventCount.put(event.getType(), count);
}
}
@@ -164,7 +180,7 @@ public class TestVertexImpl {
public void handle(DAGHistoryEvent event) {
}
}
-
+
private class VertexEventDispatcher
implements EventHandler<VertexEvent> {
@@ -421,7 +437,7 @@ public class TestVertexImpl {
Map<Vertex, EdgeProperty> outVertices =
new HashMap<Vertex, EdgeProperty>();
-
+
for(String inEdgeId : vertexPlan.getInEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(inEdgeId);
Vertex inVertex = this.vertices.get(edgePlan.getInputVertexName());
@@ -469,6 +485,7 @@ public class TestVertexImpl {
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(HistoryEventType.class,
new HistoryHandler());
+ dispatcher.register(TaskEventType.class, new TaskEventHandler());
dispatcher.init(conf);
dispatcher.start();
}
@@ -500,10 +517,10 @@ public class TestVertexImpl {
if (checkKillWait) {
Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
} else {
- Assert.assertEquals(VertexState.KILLED, v.getState());
+ Assert.assertEquals(VertexState.KILLED, v.getState());
}
}
-
+
private void startVertex(VertexImpl v,
boolean checkRunningState) {
Assert.assertEquals(VertexState.INITED, v.getState());
@@ -522,10 +539,10 @@ public class TestVertexImpl {
VertexImpl v3 = vertices.get("vertex3");
initVertex(v3);
-
+
Assert.assertEquals("x3.y3", v3.getProcessorName());
Assert.assertEquals("foo", v3.getJavaOpts());
-
+
Assert.assertEquals(2, v3.getInputSpecList().size());
Assert.assertEquals(2, v3.getInputVerticesCount());
Assert.assertEquals(2, v3.getOutputVerticesCount());
@@ -547,7 +564,7 @@ public class TestVertexImpl {
.getInputClassName())
|| "i3_v2".equals(v3.getInputSpecList().get(1)
.getInputClassName()));
-
+
Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0)
.getVertexName())
|| "vertex5".equals(v3.getOutputSpecList().get(0)
@@ -606,7 +623,7 @@ public class TestVertexImpl {
v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
-
+
v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
@@ -632,7 +649,7 @@ public class TestVertexImpl {
StringUtils.join(",", v.getDiagnostics()).toLowerCase();
Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
}
-
+
@Test
public void testVertexWithNoTasks() {
// FIXME a vertex with no tasks should not be allowed
@@ -644,14 +661,14 @@ public class TestVertexImpl {
}
@Test
- public void testVertexKill() {
+ public void testVertexKillDiagnostics() {
VertexImpl v1 = vertices.get("vertex1");
killVertex(v1, false);
String diagnostics =
StringUtils.join(",", v1.getDiagnostics()).toLowerCase();
Assert.assertTrue(diagnostics.contains(
"vertex received kill in new state"));
-
+
VertexImpl v2 = vertices.get("vertex2");
initVertex(v2);
killVertex(v2, false);
@@ -672,30 +689,73 @@ public class TestVertexImpl {
}
@Test
- public void testKilledTasksHandling() {
+ public void testVertexKillPending() {
VertexImpl v = vertices.get("vertex2");
initVertex(v);
+ VertexImpl v3 = vertices.get("vertex3");
+ initVertex(v3);
+
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+ Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+
+ v.handle(new VertexEventTaskCompleted(
+ new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+ Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
- v.handle(new VertexEventTaskCompleted(t1, TaskState.KILLED));
+ v.handle(new VertexEventTaskCompleted(
+ new TezTaskID(v.getVertexId(), 1), TaskState.KILLED));
dispatcher.await();
- Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(VertexState.KILLED, v.getState());
+ }
+
+ @Test
+ @Ignore
+ public void testVertexKill() {
+ VertexImpl v = vertices.get("vertex2");
+ initVertex(v);
+ VertexImpl v3 = vertices.get("vertex3");
+ initVertex(v3);
- v.handle(new VertexEventTaskCompleted(t2, TaskState.KILLED));
+ startVertex(v);
+
+ v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+ Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+
+ v.handle(new VertexEventTaskCompleted(
+ new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+ Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+
+ v.handle(new VertexEventTaskCompleted(
+ new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
}
@Test
+ @Ignore
+ public void testKilledTasksHandling() {
+ VertexImpl v = vertices.get("vertex2");
+ initVertex(v);
+ startVertex(v);
+
+ TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+ TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+ v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(TaskState.KILLED, v.getTask(t2).getState());
+ }
+
+ @Test
public void testVertexCommitterInit() {
VertexImpl v2 = vertices.get("vertex2");
initVertex(v2);
Assert.assertTrue(v2.getVertexOutputCommitter()
instanceof NullVertexOutputCommitter);
-
+
VertexImpl v6 = vertices.get("vertex6");
initVertex(v6);
Assert.assertTrue(v6.getVertexOutputCommitter()
@@ -708,13 +768,13 @@ public class TestVertexImpl {
initVertex(v2);
Assert.assertTrue(v2.getVertexScheduler()
instanceof ImmediateStartVertexScheduler);
-
+
VertexImpl v6 = vertices.get("vertex6");
initVertex(v6);
Assert.assertTrue(v6.getVertexScheduler()
instanceof BipartiteSlowStartVertexScheduler);
}
-
+
@Test
public void testVertexTaskFailure() {
VertexImpl v = vertices.get("vertex2");
@@ -730,7 +790,7 @@ public class TestVertexImpl {
v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
-
+
v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
dispatcher.await();
@@ -765,9 +825,9 @@ public class TestVertexImpl {
public void testDiagnostics() {
// FIXME need to test diagnostics in various cases
}
-
+
@Test
- public void testTaskAttemptCompletionEvents() {
+ public void testTaskAttemptCompletionEvents() {
// FIXME need to test handling of task attempt events
}
@@ -821,7 +881,7 @@ public class TestVertexImpl {
v4.handle(new VertexEventTaskAttemptCompleted(cEvt3));
v5.handle(new VertexEventTaskAttemptCompleted(cEvt4));
v5.handle(new VertexEventTaskAttemptCompleted(cEvt5));
- v5.handle(new VertexEventTaskAttemptCompleted(cEvt6));
+ v5.handle(new VertexEventTaskAttemptCompleted(cEvt6));
v4.handle(new VertexEventTaskCompleted(t1_v4, TaskState.SUCCEEDED));
v4.handle(new VertexEventTaskCompleted(t2_v4, TaskState.SUCCEEDED));
@@ -850,7 +910,9 @@ public class TestVertexImpl {
v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
-
+ Assert.assertEquals(1,
+ dagEventDispatcher.eventCount.get(
+ DAGEventType.DAG_VERTEX_COMPLETED).intValue());
}
@Test
@@ -861,7 +923,7 @@ public class TestVertexImpl {
CountingVertexOutputCommitter committer =
new CountingVertexOutputCommitter();
v.setVertexOutputCommitter(committer);
-
+
startVertex(v);
TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
@@ -873,14 +935,14 @@ public class TestVertexImpl {
// v.handle(new VertexEventTaskReschedule(t1));
v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
dispatcher.await();
- Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
Assert.assertEquals(0, committer.commitCounter);
-
+
v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
dispatcher.await();
- Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+ Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, committer.commitCounter);
-
+
}
@Test
@@ -890,7 +952,7 @@ public class TestVertexImpl {
CountingVertexOutputCommitter committer =
new CountingVertexOutputCommitter();
v.setVertexOutputCommitter(committer);
-
+
startVertex(v);
TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
@@ -900,23 +962,23 @@ public class TestVertexImpl {
v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
dispatcher.await();
- Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+ Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, committer.commitCounter);
-
+
v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
dispatcher.await();
- Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+ Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, committer.commitCounter);
Assert.assertEquals(0, committer.abortCounter);
Assert.assertEquals(0, committer.initCounter); // already done in init
Assert.assertEquals(0, committer.setupCounter); // already done in init
}
-
+
@Test
public void testCommitterInitAndSetup() {
// FIXME need to add a test for this
}
-
+
@Test
public void testTaskAttemptFetchFailureHandling() {
// FIXME needs testing
@@ -929,7 +991,7 @@ public class TestVertexImpl {
CountingVertexOutputCommitter committer =
new CountingVertexOutputCommitter(true, true);
v.setVertexOutputCommitter(committer);
-
+
startVertex(v);
TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
@@ -938,13 +1000,28 @@ public class TestVertexImpl {
v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
dispatcher.await();
- Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(VertexState.FAILED, v.getState());
Assert.assertEquals(1, committer.commitCounter);
-
+
// FIXME need to verify whether abort needs to be called if commit fails
Assert.assertEquals(0, committer.abortCounter);
Assert.assertEquals(0, committer.initCounter); // already done in init
- Assert.assertEquals(0, committer.setupCounter); // already done in init
+ Assert.assertEquals(0, committer.setupCounter); // already done in init
+ }
+
+ @Test
+ public void testHistoryEventGeneration() {
+ }
+
+ @Test
+ public void testInvalidEvent() {
+ VertexImpl v = vertices.get("vertex2");
+ v.handle(new VertexEvent(v.getVertexId(),
+ VertexEventType.V_START));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.ERROR, v.getState());
+ Assert.assertEquals(1,
+ dagEventDispatcher.eventCount.get(
+ DAGEventType.INTERNAL_ERROR).intValue());
}
-
}