You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/05/20 23:50:26 UTC

git commit: TEZ-111. Create tests for DAGImpl. (hitesh)

Updated Branches:
  refs/heads/TEZ-1 be6d4bc09 -> 8298190dd


TEZ-111. Create tests for DAGImpl. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8298190d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8298190d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8298190d

Branch: refs/heads/TEZ-1
Commit: 8298190ddb513652ea65f72a5104f6b3b3320c86
Parents: be6d4bc
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 20 14:49:43 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 20 14:49:43 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/event/DAGEventType.java |   17 +-
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |   45 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |   76 +-
 .../apache/tez/dag/app/dag/impl/TestDAGImpl.java   |  626 +++++++++++++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java       |  167 +++-
 5 files changed, 814 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 20f14b2..14c2f30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -30,24 +30,17 @@ public enum DAGEventType {
   DAG_INIT,
   DAG_START,
 
-  //Producer:Task
-  /*
-  JOB_TASK_COMPLETED,
-  JOB_MAP_TASK_RESCHEDULED,
-  JOB_TASK_ATTEMPT_COMPLETED,
-  */
-  
   //Producer: Vertex
-  DAG_VERTEX_INITED,
-  DAG_VERTEX_STARTED,
   DAG_VERTEX_COMPLETED,
+
+  //Producer: TaskImpl
   DAG_SCHEDULER_UPDATE,
-  
-  //Producer:Job
+
+  //Producer:Dag
   DAG_COMPLETED,
 
   //Producer:Any component
   DAG_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,
-  DAG_COUNTER_UPDATE,  
+  DAG_COUNTER_UPDATE,
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 66f9c36..5f47818 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -117,7 +117,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final TaskAttemptListener taskAttemptListener;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
-  
+
   private DAGScheduler dagScheduler;
 
   private final EventHandler eventHandler;
@@ -129,7 +129,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final AppContext appContext;
 
   volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
-  private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>(); 
+  private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>();
   private TezCounters dagCounters = new TezCounters();
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
@@ -150,7 +150,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
       new CounterUpdateTransition();
-  private static final DAGSchedulerUpdateTransition 
+  private static final DAGSchedulerUpdateTransition
           DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
 
   protected static final
@@ -211,8 +211,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(DAGState.RUNNING, DAGState.RUNNING,
               DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(DAGState.RUNNING, DAGState.RUNNING, 
-              DAGEventType.DAG_SCHEDULER_UPDATE, 
+          .addTransition(DAGState.RUNNING, DAGState.RUNNING,
+              DAGEventType.DAG_SCHEDULER_UPDATE,
               DAG_SCHEDULER_UPDATE_TRANSITION)
           .addTransition(
               DAGState.RUNNING,
@@ -332,7 +332,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.jobPlan = jobPlan;
     this.conf = conf;
     this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
-    
+
     this.userName = appUserName;
     // TODO Metrics
     //this.metrics = metrics;
@@ -374,7 +374,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   public TezConfiguration getConf() {
     return conf;
   }
-  
+
   @Override
   public DAGPlan getJobPlan() {
     return jobPlan;
@@ -508,7 +508,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       readLock.unlock();
     }
   }
-  
+
   // monitoring apis
   @Override
   public DAGStatusBuilder getDAGStatus() {
@@ -552,11 +552,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
     return vertex.getVertexStatus();
   }
-  
-  
+
+
   protected void startRootVertices() {
     for (Vertex v : vertices.values()) {
       if (v.getInputVerticesCount() == 0) {
+        LOG.info("DEBUG: Starting root vertex " + v.getName());
         eventHandler.handle(new VertexEvent(v.getVertexId(),
             VertexEventType.V_START));
       }
@@ -605,7 +606,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       writeLock.unlock();
     }
   }
-  
+
   @Private
   public DAGState getInternalState() {
     readLock.lock();
@@ -727,7 +728,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   public String getUserName() {
     return userName;
   }
-  
+
   @Override
   public String getQueueName() {
     return queueName;
@@ -862,7 +863,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
         dag.edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dag.getJobPlan().getEdgeList());
         Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
-        
+
         // setup the dag
         for (Vertex v : dag.vertices.values()) {
           parseVertexEdges(dag, edgePlans, v);
@@ -888,10 +889,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
       TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
-      
+
       VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
       VertexLocationHint vertexLocationHint = DagTypeConverters.convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
-        
+
       return new VertexImpl(
           vertexId, vertexPlan, vertexName, dag.conf,
           dag.eventHandler, dag.taskAttemptListener,
@@ -912,18 +913,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
       for(String inEdgeId : vertexPlan.getInEdgeIdList()){
         EdgePlan edgePlan = edgePlans.get(inEdgeId);
-        Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());    
+        Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
         EdgeProperty edgeProp = dag.edges.get(inEdgeId);
         inVertices.put(inVertex, edgeProp);
       }
-      
+
       for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
         EdgePlan edgePlan = edgePlans.get(outEdgeId);
-        Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());    
+        Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
         EdgeProperty edgeProp = dag.edges.get(outEdgeId);
         outVertices.put(outVertex, edgeProp);
       }
-      
+
       vertex.setInputVertices(inVertices);
       vertex.setOutputVertices(outVertices);
     }
@@ -931,9 +932,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     protected void setup(DAGImpl job) throws IOException {
       job.initTime = job.clock.getTime();
       String dagIdString = job.dagId.toString();
-      
+
       dagIdString.replace("application", "job");
-      
+
       // TODO remove - TEZ-71
       String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
@@ -1165,7 +1166,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
     }
   }
-  
+
   private static class DAGSchedulerUpdateTransition implements
   SingleArcTransition<DAGImpl, DAGEvent> {
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 20d13cd..ce4c609 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -141,7 +141,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   // TODO Metrics
   //private final MRAppMetrics metrics;
   private final AppContext appContext;
-  
+
   private boolean lazyTasksCopyNeeded = false;
   volatile Map<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
   private Object fullCountersLock = new Object();
@@ -154,7 +154,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private int numStartedSourceVertices = 0;
   private int distanceFromRoot = 0;
-  
+
   private List<TezDependentTaskCompletionEvent> sourceTaskAttemptCompletionEvents;
   private final List<String> diagnostics = new ArrayList<String>();
 
@@ -167,7 +167,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   List<InputSpec> inputSpecList;
   List<OutputSpec> outputSpecList;
-  
+
   private static final InternalErrorTransition
       INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final TaskAttemptCompletedEventTransition
@@ -200,7 +200,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED, VertexState.INITED,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
-          .addTransition(VertexState.INITED, VertexState.RUNNING, 
+          .addTransition(VertexState.INITED, VertexState.RUNNING,
               VertexEventType.V_START,
               new StartTransition())
 
@@ -220,7 +220,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
               (VertexState.RUNNING,
-              EnumSet.of(VertexState.RUNNING, VertexState.KILLED,
+              EnumSet.of(VertexState.RUNNING,
                   VertexState.SUCCEEDED, VertexState.FAILED),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
@@ -335,14 +335,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Token<JobTokenIdentifier> jobToken;
 
   private final TezVertexID vertexId;  //runtime assigned id.
-  private final VertexPlan vertexPlan;  
+  private final VertexPlan vertexPlan;
 
   private final String vertexName;
   private final String processorName;
 
   private Map<Vertex, EdgeProperty> sourceVertices;
   private Map<Vertex, EdgeProperty> targetVertices;
-  
+
   private VertexScheduler vertexScheduler;
 
   private VertexOutputCommitter committer;
@@ -351,8 +351,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
   private final String javaOpts;
-  
-  public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, 
+
+  public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, TezConfiguration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
@@ -384,13 +384,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.jobToken = jobToken;
     this.committer = new NullVertexOutputCommitter();
     this.vertexLocationHint = vertexLocationHint;
-    
+
     this.taskResource = DagTypeConverters.CreateResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
-    this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null;  
+    this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null;
     this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig().getLocalResourceList());
     this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig().getEnvironmentSettingList());
     this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
-    
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -404,12 +404,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public TezVertexID getVertexId() {
     return vertexId;
   }
-  
+
   @Override
   public VertexPlan getVertexPlan() {
     return vertexPlan;
   }
-  
+
   @Override
   public int getDistanceFromRoot() {
     return distanceFromRoot;
@@ -521,7 +521,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       this.readLock.unlock();
     }
   }
-  
+
   @Override
   public ProgressBuilder getVertexProgress() {
     this.readLock.lock();
@@ -537,7 +537,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       this.readLock.unlock();
     }
   }
-  
+
   @Override
   public VertexStatusBuilder getVertexStatus() {
     this.readLock.lock();
@@ -631,7 +631,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       writeLock.unlock();
     }
   }
-  
+
   private VertexState getInternalState() {
     readLock.lock();
     try {
@@ -710,7 +710,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.abortVertex(VertexStatus.State.FAILED);
       return vertex.finished(VertexState.FAILED);
     }
-    
+
     if(vertex.succeededTaskCount == vertex.tasks.size()) {
       try {
         if (!vertex.committed.getAndSet(true)) {
@@ -721,9 +721,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
         return vertex.finished(VertexState.FAILED);
       }
-      return vertex.finished(VertexState.SUCCEEDED);      
+      return vertex.finished(VertexState.SUCCEEDED);
     }
-    
+
     if (vertex.completedTaskCount == vertex.tasks.size()) {
       // this means the vertex has some killed tasks
       assert vertex.killedTaskCount > 0;
@@ -731,9 +731,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.abortVertex(VertexStatus.State.KILLED);
       return vertex.finished(VertexState.KILLED);
     }
-    
+
     //return the current state, Vertex not finished yet
-    return vertex.getInternalState();    
+    return vertex.getInternalState();
   }
 
   VertexState finished(VertexState finalState) {
@@ -774,9 +774,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         // TODODAGAM
         // TODO: Splits?
-        
+
         vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
-        
+
         /*
         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         job.numMapTasks = taskSplitMetaInfo.length;
@@ -794,8 +794,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         // create the Tasks but don't start them yet
         createTasks(vertex);
-        
-        
+
+
 
         boolean hasBipartite = false;
         if (vertex.sourceVertices != null) {
@@ -806,10 +806,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             }
           }
         }
-        
+
         if (hasBipartite) {
           // setup vertex scheduler
-          // TODO this needs to consider data size and perhaps API. 
+          // TODO this needs to consider data size and perhaps API.
           // Currently implicitly BIPARTITE is the only edge type
           vertex.vertexScheduler = new BipartiteSlowStartVertexScheduler(
               vertex,
@@ -907,7 +907,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
-      VertexEventSourceVertexStarted startEvent = 
+      VertexEventSourceVertexStarted startEvent =
                                       (VertexEventSourceVertexStarted) event;
       int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
       if(vertex.distanceFromRoot < distanceFromRoot) {
@@ -916,8 +916,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.numStartedSourceVertices++;
       if (vertex.numStartedSourceVertices == vertex.sourceVertices.size()) {
         // Consider inlining this.
-        LOG.info("Starting vertex: " + vertex.getVertexId() + 
-                 " with name: " + vertex.getName() + 
+        LOG.info("Starting vertex: " + vertex.getVertexId() +
+                 " with name: " + vertex.getName() +
                  " with distanceFromRoot: " + vertex.distanceFromRoot );
         vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
             VertexEventType.V_START));
@@ -1040,10 +1040,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       //eventId is equal to index in the arraylist
       tce.setEventId(vertex.sourceTaskAttemptCompletionEvents.size());
       vertex.sourceTaskAttemptCompletionEvents.add(tce);
-      // TODO this needs to be ordered/grouped by source vertices or else 
-      // my tasks will not know which events are for which vertices' tasks. This 
+      // TODO this needs to be ordered/grouped by source vertices or else
+      // my tasks will not know which events are for which vertices' tasks. This
       // differentiation was not needed for MR because there was only 1 M stage.
-      // if the tce is sent to the task then a solution could be to add vertex 
+      // if the tce is sent to the task then a solution could be to add vertex
       // name to the tce
       // need to send vertex name and task index in that vertex
 
@@ -1060,7 +1060,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
         vertex.successSourceAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
       }
-      
+
       vertex.vertexScheduler.onSourceTaskCompleted(attemptId);
     }
   }
@@ -1073,7 +1073,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       TezDependentTaskCompletionEvent tce =
         ((VertexEventTaskAttemptCompleted) event).getCompletionEvent();
 
-      // TODO this should only be sent for successful events? looks like all 
+      // TODO this should only be sent for successful events? looks like all
       // need to be sent in the existing shuffle code
       // Notify all target vertices
       if (vertex.targetVertices != null) {
@@ -1245,7 +1245,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public TezDAGID getDAGId() {
     return getDAG().getID();
   }
-  
+
   @Override
   public ApplicationAttemptId getApplicationAttemptId() {
     return appContext.getApplicationAttemptId();
@@ -1259,7 +1259,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public DAG getDAG() {
     return appContext.getDAG();
   }
-  
+
   @VisibleForTesting
   String getProcessorName() {
     return this.processorName;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
new file mode 100644
index 0000000..3adcd8b
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeConnectionPattern;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestDAGImpl {
+
+  private static final Log LOG = LogFactory.getLog(TestDAGImpl.class);
+  private DAGPlan dagPlan;
+  private TezDAGID dagId;
+  private TezConfiguration conf;
+  private DrainDispatcher dispatcher;
+  private Credentials fsTokens;
+  private AppContext appContext;
+  private ApplicationAttemptId appAttemptId;
+  private DAGImpl dag;
+  private VertexEventDispatcher vertexEventDispatcher;
+  private DagEventDispatcher dagEventDispatcher;
+  private TaskAttemptListener taskAttemptListener;
+  private TaskHeartbeatHandler thh;
+  private Clock clock = new SystemClock();
+  private JobTokenSecretManager jobTokenSecretManager;
+  private DAGFinishEventHandler dagFinishEventHandler;
+
+  private class DagEventDispatcher implements EventHandler<DAGEvent> {
+    @Override
+    public void handle(DAGEvent event) {
+      dag.handle(event);
+    }
+  }
+
+  private class HistoryHandler implements EventHandler<DAGHistoryEvent> {
+    @Override
+    public void handle(DAGHistoryEvent event) {
+    }
+  }
+
+  private class TaskEventHandler implements EventHandler<TaskEvent> {
+    @Override
+    public void handle(TaskEvent event) {
+    }
+  }
+
+  private class VertexEventDispatcher
+      implements EventHandler<VertexEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(VertexEvent event) {
+      Vertex vertex = dag.getVertex(event.getVertexId());
+      ((EventHandler<VertexEvent>) vertex).handle(event);
+    }
+  }
+
+  private class DAGFinishEventHandler
+  implements EventHandler<DAGFinishEvent> {
+    public int dagFinishEvents = 0;
+
+    @Override
+    public void handle(DAGFinishEvent event) {
+      ++dagFinishEvents;
+    }
+  }
+
+  private DAGPlan createTestDAGPlan() {
+    LOG.info("Setting up dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testverteximpl")
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex1")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host1")
+                .addRack("rack1")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(1)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x1.y1")
+                .build()
+                )
+            .addOutEdgeId("e1")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex2")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host2")
+                .addRack("rack2")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x2.y2")
+                .build()
+                )
+            .addOutEdgeId("e2")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex3")
+            .setType(PlanVertexType.NORMAL)
+            .setProcessorName("x3.y3")
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host3")
+                .addRack("rack3")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("foo")
+                .setTaskModule("x3.y3")
+                .build()
+                )
+            .addInEdgeId("e1")
+            .addInEdgeId("e2")
+            .addOutEdgeId("e3")
+            .addOutEdgeId("e4")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex4")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host4")
+                .addRack("rack4")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x4.y4")
+                .build()
+                )
+            .addInEdgeId("e3")
+            .addOutEdgeId("e5")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex5")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host5")
+                .addRack("rack5")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x5.y5")
+                .build()
+                )
+            .addInEdgeId("e4")
+            .addOutEdgeId("e6")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex6")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host6")
+                .addRack("rack6")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x6.y6")
+                .build()
+                )
+            .addInEdgeId("e5")
+            .addInEdgeId("e6")
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i3_v1")
+            .setInputVertexName("vertex1")
+            .setOutputClass("o1")
+            .setOutputVertexName("vertex3")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e1")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i3_v2")
+            .setInputVertexName("vertex2")
+            .setOutputClass("o2")
+            .setOutputVertexName("vertex3")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e2")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i4_v3")
+            .setInputVertexName("vertex3")
+            .setOutputClass("o3_v4")
+            .setOutputVertexName("vertex4")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e3")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i5_v3")
+            .setInputVertexName("vertex3")
+            .setOutputClass("o3_v5")
+            .setOutputVertexName("vertex5")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e4")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i6_v4")
+            .setInputVertexName("vertex4")
+            .setOutputClass("o4")
+            .setOutputVertexName("vertex6")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e5")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i6_v5")
+            .setInputVertexName("vertex5")
+            .setOutputClass("o5")
+            .setOutputVertexName("vertex6")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e6")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .build();
+
+    return dag;
+  }
+
+  @Before
+  public void setup() {
+    conf = new TezConfiguration();
+    appAttemptId = BuilderUtils.newApplicationAttemptId(
+        BuilderUtils.newApplicationId(100, 1), 1);
+    dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
+    Assert.assertNotNull(dagId);
+    dagPlan = createTestDAGPlan();
+    dispatcher = new DrainDispatcher();
+    fsTokens = new Credentials();
+    jobTokenSecretManager = new JobTokenSecretManager();
+    appContext = mock(AppContext.class);
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(dagId).when(appContext).getDAGID();
+    dag = new DAGImpl(dagId, appAttemptId, conf, dagPlan,
+        dispatcher.getEventHandler(),  taskAttemptListener,
+        jobTokenSecretManager, fsTokens, clock, "user", 10000, thh, appContext);
+    doReturn(dag).when(appContext).getDAG();
+    vertexEventDispatcher = new VertexEventDispatcher();
+    dispatcher.register(VertexEventType.class, vertexEventDispatcher);
+    dagEventDispatcher = new DagEventDispatcher();
+    dispatcher.register(DAGEventType.class, dagEventDispatcher);
+    dispatcher.register(HistoryEventType.class,
+        new HistoryHandler());
+    dagFinishEventHandler = new DAGFinishEventHandler();
+    dispatcher.register(DAGFinishEvent.Type.class, dagFinishEventHandler);
+    dispatcher.register(TaskEventType.class, new TaskEventHandler());
+    dispatcher.init(conf);
+    dispatcher.start();
+  }
+
+  @After
+  public void teardown() {
+    dagPlan = null;
+    dag = null;
+    dispatcher.await();
+    dispatcher.stop();
+  }
+
+  private void initDAG(DAGImpl dag) {
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+    Assert.assertEquals(DAGState.INITED, dag.getState());
+  }
+
+  private void startDAG(DAGImpl dag) {
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START));
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+  }
+
+  @Test
+  public void testDAGInit() {
+    initDAG(dag);
+    Assert.assertEquals(6, dag.getTotalVertices());
+  }
+
+  @Test
+  public void testDAGStart() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    for (int i = 0 ; i < 6; ++i ) {
+      TezVertexID vId = new TezVertexID(dagId, i);
+      Vertex v = dag.getVertex(vId);
+      Assert.assertEquals(VertexState.RUNNING, v.getState());
+      if (i < 2) {
+        Assert.assertEquals(0, v.getDistanceFromRoot());
+      } else if (i == 2) {
+        Assert.assertEquals(1, v.getDistanceFromRoot());
+      } else if ( i > 2 && i < 5) {
+        Assert.assertEquals(2, v.getDistanceFromRoot());
+      } else if (i == 5) {
+        Assert.assertEquals(3, v.getDistanceFromRoot());
+      }
+    }
+
+    for (int i = 0 ; i < 6; ++i ) {
+      TezVertexID vId = new TezVertexID(dagId, i);
+      LOG.info("Distance from root: v" + i + ":"
+          + dag.getVertex(vId).getDistanceFromRoot());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testVertexCompletion() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    TezVertexID vId = new TezVertexID(dagId, 1);
+    Vertex v = dag.getVertex(vId);
+    ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted(
+        new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+    ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted(
+        new TezTaskID(vId, 1), TaskState.SUCCEEDED));
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+    Assert.assertEquals(1, dag.getSuccessfulVertices());
+  }
+
+  public void testKillStartedDAG() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.await();
+
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    for (int i = 0 ; i < 6; ++i ) {
+      TezVertexID vId = new TezVertexID(dagId, i);
+      Vertex v = dag.getVertex(vId);
+      Assert.assertEquals(VertexState.KILLED, v.getState());
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKillRunningDAG() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    TezVertexID vId1 = new TezVertexID(dagId, 1);
+    Vertex v1 = dag.getVertex(vId1);
+    ((EventHandler<VertexEvent>) v1).handle(new VertexEventTaskCompleted(
+        new TezTaskID(vId1, 0), TaskState.SUCCEEDED));
+    TezVertexID vId0 = new TezVertexID(dagId, 0);
+    Vertex v0 = dag.getVertex(vId0);
+    ((EventHandler<VertexEvent>) v0).handle(new VertexEventTaskCompleted(
+        new TezTaskID(vId0, 0), TaskState.SUCCEEDED));
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.await();
+
+    Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
+    Assert.assertEquals(VertexState.KILL_WAIT, v1.getState());
+    for (int i = 2 ; i < 6; ++i ) {
+      TezVertexID vId = new TezVertexID(dagId, i);
+      Vertex v = dag.getVertex(vId);
+      Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+    }
+    Assert.assertEquals(1, dag.getSuccessfulVertices());
+  }
+
+  @Test
+  public void testInvalidEvent() {
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.ERROR, dag.getState());
+  }
+
+  @Test
+  @Ignore
+  public void testVertexSuccessfulCompletionUpdates() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    for (int i = 0; i < 6; ++i) {
+      dag.handle(new DAGEventVertexCompleted(
+          new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+    }
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+    Assert.assertEquals(1, dag.getSuccessfulVertices());
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 2), VertexState.SUCCEEDED));
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 3), VertexState.SUCCEEDED));
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 4), VertexState.SUCCEEDED));
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 5), VertexState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.SUCCEEDED, dag.getState());
+    Assert.assertEquals(6, dag.getSuccessfulVertices());
+  }
+
+  @Test
+  @Ignore
+  public void testVertexFailureHandling() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 2), VertexState.FAILED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(2, dag.getSuccessfulVertices());
+
+    // Expect running vertices to be killed on first failure
+    for (int i = 3; i < 6; ++i) {
+      TezVertexID vId = new TezVertexID(dagId, i);
+      Vertex v = dag.getVertex(vId);
+      Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+    }
+  }
+
+  @Test
+  @Ignore
+  public void testDAGKill() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+
+    for (int i = 2; i < 6; ++i) {
+      dag.handle(new DAGEventVertexCompleted(
+          new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+    }
+    dispatcher.await();
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(6, dag.getSuccessfulVertices());
+    Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
+  }
+
+  @Test
+  public void testDAGKillPending() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+
+    for (int i = 2; i < 5; ++i) {
+      dag.handle(new DAGEventVertexCompleted(
+          new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+    }
+    dispatcher.await();
+    Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+
+    dag.handle(new DAGEventVertexCompleted(
+        new TezVertexID(dagId, 5), VertexState.KILLED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(5, dag.getSuccessfulVertices());
+    Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
+  }
+
+  @Test
+  public void testDiagnosticUpdates() {
+    // FIXME need to implement
+  }
+
+  @Test
+  public void testCounterUpdates() {
+    // FIXME need to implement
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8298190d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index fe6ae14..f78a7b8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -62,6 +62,8 @@ import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
@@ -115,7 +117,7 @@ public class TestVertexImpl {
     public int abortCounter = 0;
     private boolean throwError;
     private boolean throwErrorOnAbort;
-    
+
     public CountingVertexOutputCommitter(boolean throwError,
         boolean throwOnAbort) {
       this.throwError = throwError;
@@ -125,7 +127,7 @@ public class TestVertexImpl {
     public CountingVertexOutputCommitter() {
       this(false, false);
     }
-    
+
     @Override
     public void init(VertexContext context) throws IOException {
       ++initCounter;
@@ -150,12 +152,26 @@ public class TestVertexImpl {
       if (throwErrorOnAbort) {
         throw new IOException("I can throwz exceptions in abort");
       }
-    }    
+    }
   }
-  
+
+  private class TaskEventHandler implements EventHandler<TaskEvent> {
+    @Override
+    public void handle(TaskEvent event) {
+    }
+  }
+
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
+    public Map<DAGEventType, Integer> eventCount =
+        new HashMap<DAGEventType, Integer>();
+
     @Override
     public void handle(DAGEvent event) {
+      int count = 1;
+      if (eventCount.containsKey(event.getType())) {
+        count = eventCount.get(event.getType()) + 1;
+      }
+      eventCount.put(event.getType(), count);
     }
   }
 
@@ -164,7 +180,7 @@ public class TestVertexImpl {
     public void handle(DAGHistoryEvent event) {
     }
   }
-  
+
   private class VertexEventDispatcher
       implements EventHandler<VertexEvent> {
 
@@ -421,7 +437,7 @@ public class TestVertexImpl {
 
       Map<Vertex, EdgeProperty> outVertices =
           new HashMap<Vertex, EdgeProperty>();
-      
+
       for(String inEdgeId : vertexPlan.getInEdgeIdList()){
         EdgePlan edgePlan = edgePlans.get(inEdgeId);
         Vertex inVertex = this.vertices.get(edgePlan.getInputVertexName());
@@ -469,6 +485,7 @@ public class TestVertexImpl {
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
     dispatcher.register(HistoryEventType.class,
         new HistoryHandler());
+    dispatcher.register(TaskEventType.class, new TaskEventHandler());
     dispatcher.init(conf);
     dispatcher.start();
   }
@@ -500,10 +517,10 @@ public class TestVertexImpl {
     if (checkKillWait) {
       Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
     } else {
-      Assert.assertEquals(VertexState.KILLED, v.getState());      
+      Assert.assertEquals(VertexState.KILLED, v.getState());
     }
   }
-  
+
   private void startVertex(VertexImpl v,
       boolean checkRunningState) {
     Assert.assertEquals(VertexState.INITED, v.getState());
@@ -522,10 +539,10 @@ public class TestVertexImpl {
 
     VertexImpl v3 = vertices.get("vertex3");
     initVertex(v3);
-    
+
     Assert.assertEquals("x3.y3", v3.getProcessorName());
     Assert.assertEquals("foo", v3.getJavaOpts());
-    
+
     Assert.assertEquals(2, v3.getInputSpecList().size());
     Assert.assertEquals(2, v3.getInputVerticesCount());
     Assert.assertEquals(2, v3.getOutputVerticesCount());
@@ -547,7 +564,7 @@ public class TestVertexImpl {
         .getInputClassName())
         || "i3_v2".equals(v3.getInputSpecList().get(1)
             .getInputClassName()));
-    
+
     Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0)
         .getVertexName())
         || "vertex5".equals(v3.getOutputSpecList().get(0)
@@ -606,7 +623,7 @@ public class TestVertexImpl {
     v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
-    
+
     v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
@@ -632,7 +649,7 @@ public class TestVertexImpl {
         StringUtils.join(",", v.getDiagnostics()).toLowerCase();
     Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
   }
-  
+
   @Test
   public void testVertexWithNoTasks() {
     // FIXME a vertex with no tasks should not be allowed
@@ -644,14 +661,14 @@ public class TestVertexImpl {
   }
 
   @Test
-  public void testVertexKill() {
+  public void testVertexKillDiagnostics() {
     VertexImpl v1 = vertices.get("vertex1");
     killVertex(v1, false);
     String diagnostics =
         StringUtils.join(",", v1.getDiagnostics()).toLowerCase();
     Assert.assertTrue(diagnostics.contains(
         "vertex received kill in new state"));
-    
+
     VertexImpl v2 = vertices.get("vertex2");
     initVertex(v2);
     killVertex(v2, false);
@@ -672,30 +689,73 @@ public class TestVertexImpl {
   }
 
   @Test
-  public void testKilledTasksHandling() {
+  public void testVertexKillPending() {
     VertexImpl v = vertices.get("vertex2");
     initVertex(v);
+    VertexImpl v3 = vertices.get("vertex3");
+    initVertex(v3);
+
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+
+    v.handle(new VertexEventTaskCompleted(
+        new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.KILLED));
+    v.handle(new VertexEventTaskCompleted(
+        new TezTaskID(v.getVertexId(), 1), TaskState.KILLED));
     dispatcher.await();
-    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Assert.assertEquals(VertexState.KILLED, v.getState());
+  }
+
+  @Test
+  @Ignore
+  public void testVertexKill() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    VertexImpl v3 = vertices.get("vertex3");
+    initVertex(v3);
 
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.KILLED));
+    startVertex(v);
+
+    v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+
+    v.handle(new VertexEventTaskCompleted(
+        new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+
+    v.handle(new VertexEventTaskCompleted(
+        new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
   }
 
   @Test
+  @Ignore
+  public void testKilledTasksHandling() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(TaskState.KILLED, v.getTask(t2).getState());
+  }
+
+  @Test
   public void testVertexCommitterInit() {
     VertexImpl v2 = vertices.get("vertex2");
     initVertex(v2);
     Assert.assertTrue(v2.getVertexOutputCommitter()
         instanceof NullVertexOutputCommitter);
-    
+
     VertexImpl v6 = vertices.get("vertex6");
     initVertex(v6);
     Assert.assertTrue(v6.getVertexOutputCommitter()
@@ -708,13 +768,13 @@ public class TestVertexImpl {
     initVertex(v2);
     Assert.assertTrue(v2.getVertexScheduler()
         instanceof ImmediateStartVertexScheduler);
-    
+
     VertexImpl v6 = vertices.get("vertex6");
     initVertex(v6);
     Assert.assertTrue(v6.getVertexScheduler()
         instanceof BipartiteSlowStartVertexScheduler);
   }
-  
+
   @Test
   public void testVertexTaskFailure() {
     VertexImpl v = vertices.get("vertex2");
@@ -730,7 +790,7 @@ public class TestVertexImpl {
     v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
-    
+
     v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
     v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
     dispatcher.await();
@@ -765,9 +825,9 @@ public class TestVertexImpl {
   public void testDiagnostics() {
     // FIXME need to test diagnostics in various cases
   }
-  
+
   @Test
-  public void testTaskAttemptCompletionEvents() {    
+  public void testTaskAttemptCompletionEvents() {
     // FIXME need to test handling of task attempt events
   }
 
@@ -821,7 +881,7 @@ public class TestVertexImpl {
     v4.handle(new VertexEventTaskAttemptCompleted(cEvt3));
     v5.handle(new VertexEventTaskAttemptCompleted(cEvt4));
     v5.handle(new VertexEventTaskAttemptCompleted(cEvt5));
-    v5.handle(new VertexEventTaskAttemptCompleted(cEvt6));    
+    v5.handle(new VertexEventTaskAttemptCompleted(cEvt6));
 
     v4.handle(new VertexEventTaskCompleted(t1_v4, TaskState.SUCCEEDED));
     v4.handle(new VertexEventTaskCompleted(t2_v4, TaskState.SUCCEEDED));
@@ -850,7 +910,9 @@ public class TestVertexImpl {
     v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
-
+    Assert.assertEquals(1,
+        dagEventDispatcher.eventCount.get(
+            DAGEventType.DAG_VERTEX_COMPLETED).intValue());
   }
 
   @Test
@@ -861,7 +923,7 @@ public class TestVertexImpl {
     CountingVertexOutputCommitter committer =
         new CountingVertexOutputCommitter();
     v.setVertexOutputCommitter(committer);
-    
+
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
@@ -873,14 +935,14 @@ public class TestVertexImpl {
     // v.handle(new VertexEventTaskReschedule(t1));
     v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
-    Assert.assertEquals(VertexState.RUNNING, v.getState());    
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(0, committer.commitCounter);
-    
+
     v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
-    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());    
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
-    
+
   }
 
   @Test
@@ -890,7 +952,7 @@ public class TestVertexImpl {
     CountingVertexOutputCommitter committer =
         new CountingVertexOutputCommitter();
     v.setVertexOutputCommitter(committer);
-    
+
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
@@ -900,23 +962,23 @@ public class TestVertexImpl {
     v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
-    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());    
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
-    
+
     v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
-    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());    
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
     Assert.assertEquals(0, committer.abortCounter);
     Assert.assertEquals(0, committer.initCounter); // already done in init
     Assert.assertEquals(0, committer.setupCounter); // already done in init
   }
-  
+
   @Test
   public void testCommitterInitAndSetup() {
     // FIXME need to add a test for this
   }
-  
+
   @Test
   public void testTaskAttemptFetchFailureHandling() {
     // FIXME needs testing
@@ -929,7 +991,7 @@ public class TestVertexImpl {
     CountingVertexOutputCommitter committer =
         new CountingVertexOutputCommitter(true, true);
     v.setVertexOutputCommitter(committer);
-    
+
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
@@ -938,13 +1000,28 @@ public class TestVertexImpl {
     v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
-    Assert.assertEquals(VertexState.FAILED, v.getState());    
+    Assert.assertEquals(VertexState.FAILED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
-    
+
     // FIXME need to verify whether abort needs to be called if commit fails
     Assert.assertEquals(0, committer.abortCounter);
     Assert.assertEquals(0, committer.initCounter); // already done in init
-    Assert.assertEquals(0, committer.setupCounter); // already done in init  
+    Assert.assertEquals(0, committer.setupCounter); // already done in init
+  }
+
+  @Test
+  public void testHistoryEventGeneration() {
+  }
+
+  @Test
+  public void testInvalidEvent() {
+    VertexImpl v = vertices.get("vertex2");
+    v.handle(new VertexEvent(v.getVertexId(),
+        VertexEventType.V_START));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.ERROR, v.getState());
+    Assert.assertEquals(1,
+        dagEventDispatcher.eventCount.get(
+            DAGEventType.INTERNAL_ERROR).intValue());
   }
-  
 }