You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/07/09 02:48:50 UTC

[2/2] git commit: TEZ-141; TEZ-143; TEZ-148; TEZ-283; TEZ-284: Improve failure/kill handling, track TerminationCauses, add dag.kill()

TEZ-141;TEZ-143;TEZ-148;TEZ-283;TEZ-284: Improve failure/kill handling, track TerminationCauses, add dag.kill()


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/999d16ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/999d16ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/999d16ae

Branch: refs/heads/master
Commit: 999d16ae1022884db4a17e2b803abe52ea82fcf0
Parents: 92228ba
Author: Mike Liddell <ml...@apache.org>
Authored: Mon Jul 8 17:48:18 2013 -0700
Committer: Mike Liddell <ml...@apache.org>
Committed: Mon Jul 8 17:48:18 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/client/DAGClient.java    |   7 +
 .../apache/tez/dag/api/client/DAGStatus.java    |   3 +-
 .../apache/tez/dag/api/client/VertexStatus.java |   5 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |  18 ++
 tez-dag-api/src/main/proto/DAGApiRecords.proto  |   2 +
 .../src/main/proto/DAGClientAMProtocol.proto    |  10 +
 .../tez/dag/api/client/DAGStatusBuilder.java    |   3 +-
 .../tez/dag/api/client/VertexStatusBuilder.java |   3 +-
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  15 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   8 +-
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   4 +-
 .../org/apache/tez/dag/app/dag/DAGState.java    |   2 +-
 .../tez/dag/app/dag/DAGTerminationCause.java    |  38 ++++
 .../tez/dag/app/dag/TaskTerminationCause.java   |  36 ++++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   1 +
 .../org/apache/tez/dag/app/dag/VertexState.java |   2 +-
 .../tez/dag/app/dag/VertexTerminationCause.java |  43 ++++
 .../dag/app/dag/event/TaskEventTermination.java |  37 ++++
 .../tez/dag/app/dag/event/TaskEventType.java    |   4 +-
 .../app/dag/event/VertexEventTermination.java   |  43 ++++
 .../tez/dag/app/dag/event/VertexEventType.java  |   3 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 187 +++++++++++-----
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  24 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 213 ++++++++++++++-----
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  51 +++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |   6 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  59 ++---
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  34 ++-
 .../tez/mapreduce/hadoop/TestMRHelpers.java     |  12 +-
 29 files changed, 699 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 7ec4e22..3f5d07a 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -57,4 +57,11 @@ public interface DAGClient extends Closeable {
    */
   public VertexStatus getVertexStatus(String vertexName)
       throws IOException, TezException;
+  
+  /**
+   * Kill a running DAG
+   * 
+   */
+  public void tryKillDAG()
+      throws TezException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 8249626..c28f2bf 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -35,7 +35,7 @@ public class DAGStatus {
     SUCCEEDED,
     KILLED,
     FAILED,
-    ERROR
+    ERROR,
   };
   
   DAGStatusProtoOrBuilder proxy = null;
@@ -52,6 +52,7 @@ public class DAGStatus {
       return DAGStatus.State.SUBMITTED;
     case DAG_INITING:
       return DAGStatus.State.INITING;
+    case DAG_TERMINATING: // For simplicity, DAG_TERMINATING is presented to user as 'still running'
     case DAG_RUNNING:
       return DAGStatus.State.RUNNING;
     case DAG_SUCCEEDED:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index 185bc43..ce5dbe0 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -31,7 +31,8 @@ public class VertexStatus {
     SUCCEEDED,
     KILLED,
     FAILED,
-    ERROR
+    ERROR,
+    TERMINATING,
   };
   
   VertexStatusProtoOrBuilder proxy = null;
@@ -55,6 +56,8 @@ public class VertexStatus {
       return VertexStatus.State.KILLED;
     case VERTEX_ERROR:
       return VertexStatus.State.ERROR;
+    case VERTEX_TERMINATING:
+      return VertexStatus.State.TERMINATING;
     default:
       throw new TezUncheckedException("Unsupported value for VertexStatus.State : " + 
                               proxy.getState());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 4a4163c..b405de3 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
 
@@ -98,6 +99,21 @@ public class DAGClientRPCImpl implements DAGClient {
     // need AM for this. Later maybe from History
     return null;
   }
+  
+  public void tryKillDAG() throws TezException, IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
+    }
+    if(createAMProxyIfNeeded()) {
+      TryKillDAGRequestProto requestProto = 
+          TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();    
+      try {
+        proxy.tryKillDAG(null, requestProto);
+      } catch (ServiceException e) {
+        resetProxy(e);
+      }
+    }
+  }
 
   @Override
   public void close() throws IOException {
@@ -137,6 +153,8 @@ public class DAGClientRPCImpl implements DAGClient {
     }
   }
   
+  
+  
   DAGStatus getDAGStatusViaRM() throws TezException, IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGApiRecords.proto b/tez-dag-api/src/main/proto/DAGApiRecords.proto
index b062c15..b5fc5b7 100644
--- a/tez-dag-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-dag-api/src/main/proto/DAGApiRecords.proto
@@ -141,6 +141,7 @@ enum VertexStatusStateProto {
   VERTEX_KILLED = 3;
   VERTEX_FAILED = 4;
   VERTEX_ERROR = 5;
+  VERTEX_TERMINATING = 6;
 }
 
 message VertexStatusProto {
@@ -157,6 +158,7 @@ enum DAGStatusStateProto {
   DAG_KILLED = 4;
   DAG_FAILED = 5;
   DAG_ERROR = 6;
+  DAG_TERMINATING = 7;
 }
 
 message StringProgressPairProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
index 18247a1..dd8f3d5 100644
--- a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
@@ -49,8 +49,18 @@ message GetVertexStatusResponseProto {
   optional VertexStatusProto vertexStatus = 1;
 }
 
+message TryKillDAGRequestProto {
+  optional string dagId = 1;
+}
+
+message TryKillDAGResponseProto {
+  //nothing yet
+}
+
+
 service DAGClientAMProtocol {
   rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
   rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
   rpc getVertexStatus (GetVertexStatusRequestProto) returns (GetVertexStatusResponseProto);
+  rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
index 3247d43..2b0f543 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
@@ -71,8 +71,9 @@ public class DAGStatusBuilder extends DAGStatus {
     case FAILED:
       return DAGStatusStateProto.DAG_FAILED;
     case KILLED:
-    case KILL_WAIT:
       return DAGStatusStateProto.DAG_KILLED;
+    case TERMINATING:
+      return DAGStatusStateProto.DAG_TERMINATING;
     case ERROR:
       return DAGStatusStateProto.DAG_ERROR;
     default:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 606d8da..66de71f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -63,8 +63,9 @@ public class VertexStatusBuilder extends VertexStatus {
     case FAILED:
       return VertexStatusStateProto.VERTEX_FAILED;
     case KILLED:
-    case KILL_WAIT:
       return VertexStatusStateProto.VERTEX_KILLED;
+    case TERMINATING:
+      return VertexStatusStateProto.VERTEX_TERMINATING;
     case ERROR:
       return VertexStatusStateProto.VERTEX_ERROR;
     default:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 2b9c723..1d12ffe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -32,6 +32,8 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequ
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGResponseProto;
 import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
 
 import com.google.protobuf.RpcController;
@@ -89,6 +91,19 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
     }
   }
   
+  @Override
+  public TryKillDAGResponseProto tryKillDAG(RpcController controller,
+      TryKillDAGRequestProto request) throws ServiceException {
+    try {
+      String dagId = request.getDagId();
+      real.tryKillDAG(dagId);
+      return TryKillDAGResponseProto.newBuilder().build();
+    } catch (TezException e) {
+      throw wrapException(e);
+    }
+  }
+  
+  
   ServiceException wrapException(Exception e){
     return new ServiceException(e);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index bc14f90..c276e4f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -695,7 +695,7 @@ public class DAGAppMaster extends CompositeService {
         state = DAGAppMasterState.FAILED;
         break;
       case KILLED:
-        state = DAGAppMasterState.FAILED;
+        state = DAGAppMasterState.KILLED;
       case ERROR:
         state = DAGAppMasterState.ERROR;
       default:
@@ -736,6 +736,12 @@ public class DAGAppMaster extends CompositeService {
       }
       return dag;
     }
+    
+    public void tryKillDAG(String dagIdStr)
+        throws TezException {
+      DAG dag = getDAG(dagIdStr);
+      dag.tryKill();
+    }
   }
 
   private class RunningAppContext implements AppContext {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 913b565..144dd24 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -32,7 +32,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 
 /**
- * Main interface to interact with the job. Provides only getters. 
+ * Main interface to interact with the job.  
  */
 public interface DAG {
 
@@ -77,5 +77,5 @@ public interface DAG {
   //List<AMInfo> getAMInfos();
   
   boolean checkAccess(UserGroupInformation callerUGI, ApplicationAccessType jobOperation);
-
+  void tryKill();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
index a7de770..b83e9a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
@@ -25,5 +25,5 @@ public enum DAGState {
   FAILED,
   KILLED,
   ERROR, 
-  KILL_WAIT
+  TERMINATING
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
new file mode 100644
index 0000000..05f15f3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+
+/**
+ * Represents proximate cause of a DAG transition to FAILED or KILLED. 
+ */
+public enum DAGTerminationCause {
+
+  /** DAG was directly killed.   */
+  DAG_KILL, 
+  
+  /** A vertex failed. */ 
+  VERTEX_FAILURE, 
+  
+  /** DAG failed due as it had zero vertices. */
+  ZERO_VERTICES, 
+  
+  /** DAG failed during init. */
+  INIT_FAILURE,   
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
new file mode 100644
index 0000000..73741f0
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+
+/**
+ * Represents proximate cause of a Task transition to FAILED or KILLED.
+ */
+public enum TaskTerminationCause {
+
+  /** DAG was killed  */
+  DAG_KILL, 
+  
+  /** Other vertex failed causing DAG to fail thus killing the parent vertex  */
+  OTHER_VERTEX_FAILURE,
+  
+  /** One of the tasks for the parent vertex failed.  */
+  OTHER_TASK_FAILURE, 
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f55dff5..9c4d3b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -81,4 +81,5 @@ public interface Vertex extends Comparable<Vertex> {
   Resource getTaskResource();
 
   public DAG getDAG();
+  VertexTerminationCause getTerminationCause();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 9b0e19a..887c0d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -25,5 +25,5 @@ public enum VertexState {
   FAILED,
   KILLED,
   ERROR,
-  KILL_WAIT,
+  TERMINATING,
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
new file mode 100644
index 0000000..138ee70
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+/**
+ * Represents proximate cause of transition to FAILED or KILLED.
+ */
+public enum VertexTerminationCause {
+
+  /** DAG was killed  */
+  DAG_KILL, 
+
+  /** Other vertex failed causing DAG to fail thus killing this vertex  */
+  OTHER_VERTEX_FAILURE,
+
+  /** One of the tasks for this vertex failed.  */
+  OWN_TASK_FAILURE, 
+
+  /** This vertex failed during commit. */
+  COMMIT_FAILURE,
+
+  /** This vertex failed as it had zero tasks. */
+  ZERO_TASKS, 
+
+  /** This vertex failed during init. */
+  INIT_FAILURE
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
new file mode 100644
index 0000000..130e86f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.TaskTerminationCause;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class TaskEventTermination extends TaskEvent {
+
+  private TaskTerminationCause terminationCause;
+
+  public TaskEventTermination(TezTaskID taskID, TaskTerminationCause terminationCause) {
+    super(taskID, TaskEventType.T_TERMINATE);
+    this.terminationCause = terminationCause;
+  }
+
+  public TaskTerminationCause getTerminationCause() {
+    return terminationCause;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index d2e492e..bc7f3ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -24,8 +24,8 @@ package org.apache.tez.dag.app.dag.event;
 public enum TaskEventType {
 
   //Producer:Client, Job
-  T_KILL,
-
+  T_TERMINATE,
+  
   //Producer:Job
   T_SCHEDULE,
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java
new file mode 100644
index 0000000..647f600
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.records.TezVertexID;
+
+/**
+ * Specialized VertexEvent for termination conditions.
+ *  
+ * The VertexEventType is always V_TERMINATE.  
+ * The terminationCause denotes the specific reason why the vertex is being terminated.
+ */
+public class VertexEventTermination extends VertexEvent {
+
+  private VertexTerminationCause terminationCause;
+
+  public VertexEventTermination(TezVertexID vertexId, VertexTerminationCause terminationCause) {
+    super(vertexId, VertexEventType.V_TERMINATE);
+    this.terminationCause = terminationCause;
+  }
+
+  public VertexTerminationCause getTerminationCause() {
+    return terminationCause;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 7e874d0..43cffe6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -24,7 +24,7 @@ package org.apache.tez.dag.app.dag.event;
 public enum VertexEventType {
 
   //Producer:Client, Job
-  V_KILL,
+  V_TERMINATE,
 
   //Producer:Job
   V_INIT,
@@ -56,4 +56,5 @@ public enum VertexEventType {
   V_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,
   V_COUNTER_UPDATE,
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 9e4f00b..be25c66 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -65,10 +65,12 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.DAGReport;
 import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
@@ -78,8 +80,11 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
@@ -194,11 +199,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           // Transitions from RUNNING state
           .addTransition
               (DAGState.RUNNING,
-              EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED),
+              EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED),
               DAGEventType.DAG_VERTEX_COMPLETED,
               new VertexCompletedTransition())
-          .addTransition(DAGState.RUNNING, DAGState.KILL_WAIT,
-              DAGEventType.DAG_KILL, new KillVerticesTransition())
+          .addTransition(DAGState.RUNNING, DAGState.TERMINATING,
+              DAGEventType.DAG_KILL, new DAGKilledTransition())
           .addTransition(DAGState.RUNNING, DAGState.RUNNING,
               DAGEventType.DAG_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
@@ -212,21 +217,25 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from KILL_WAIT state.
+          // Transitions from TERMINATING state.
           .addTransition
-              (DAGState.KILL_WAIT,
-              EnumSet.of(DAGState.KILL_WAIT, DAGState.KILLED),
+              (DAGState.TERMINATING,
+              EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED),
               DAGEventType.DAG_VERTEX_COMPLETED,
               new VertexCompletedTransition())
-          .addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
+          .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
               DAGEventType.DAG_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
+          .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
               DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              DAGState.KILL_WAIT,
+              DAGState.TERMINATING,
               DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          
+              // Ignore-able events
+          .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
+              EnumSet.of(DAGEventType.DAG_KILL))
 
           // Transitions from SUCCEEDED state
           .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
@@ -296,7 +305,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private int numFailedVertices = 0;
   private int numKilledVertices = 0;
   private boolean isUber = false;
-
+  private DAGTerminationCause terminationCause;
   private Credentials fsTokens;
   private Token<JobTokenIdentifier> jobToken;
   private JobTokenSecretManager jobTokenSecretManager;
@@ -484,6 +493,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       this.readLock.unlock();
     }
   }
+  
+  @Override
+  public void tryKill() {
+    //send a DAG_KILL message
+    eventHandler.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+  }
 
   @Override
   public Map<TezVertexID, Vertex> getVertices() {
@@ -647,37 +662,55 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
           + ", numFailedVertices=" + dag.numFailedVertices
           + ", numKilledVertices=" + dag.numKilledVertices
-          + ", numVertices=" + dag.numVertices);
-    }
-
-    if (dag.numFailedVertices > 0) {
-      dag.setFinishTime();
-      String diagnosticMsg = "DAG failed as vertices failed. " +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      dag.abortJob(DAGStatus.State.FAILED);
-      return dag.finished(DAGState.FAILED);
+          + ", numVertices=" + dag.numVertices
+          + ", terminationCause=" + dag.terminationCause);
     }
 
-    if(dag.numSuccessfulVertices == dag.numVertices) {
-      dag.setFinishTime();
-      dag.logJobHistoryFinishedEvent();
-      return dag.finished(DAGState.SUCCEEDED);
+    // log in case of accounting error.
+    if (dag.numCompletedVertices > dag.numVertices) {
+      LOG.error("vertex completion accounting issue: numCompletedVertices > numVertices"
+          + ", numCompletedVertices=" + dag.numCompletedVertices
+          + ", numVertices=" + dag.numVertices
+          );
     }
-
+    
     if (dag.numCompletedVertices == dag.numVertices) {
-      // this means the dag has some killed vertices
-      String diagnosticMsg = "DAG killed. " +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      assert dag.numKilledVertices > 0;
-      dag.setFinishTime();
-      dag.abortJob(DAGStatus.State.KILLED);
-      return dag.finished(DAGState.KILLED);
+      //Only succeed if vertices complete successfully and no terminationCause is registered.
+      if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
+        dag.setFinishTime();
+        dag.logJobHistoryFinishedEvent();
+        return dag.finished(DAGState.SUCCEEDED);
+      }
+      else if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
+        dag.setFinishTime();
+        String diagnosticMsg = "DAG killed due to user-initiated kill." +
+            " failedVertices:" + dag.numFailedVertices +
+            " killedVertices:" + dag.numKilledVertices;
+        LOG.info(diagnosticMsg);
+        dag.addDiagnostic(diagnosticMsg);
+        dag.abortJob(DAGStatus.State.KILLED);
+        return dag.finished(DAGState.KILLED);
+      }
+      if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
+        dag.setFinishTime();
+        String diagnosticMsg = "DAG failed due to vertex failure." +
+            " failedVertices:" + dag.numFailedVertices +
+            " killedVertices:" + dag.numKilledVertices;
+        LOG.info(diagnosticMsg);
+        dag.addDiagnostic(diagnosticMsg);
+        dag.abortJob(DAGStatus.State.FAILED);
+        return dag.finished(DAGState.FAILED);
+      }
+      else {
+        // should never get here.
+        throw new TezUncheckedException("All vertices complete, but cannot determine final state of DAG"
+            + ", numCompletedVertices=" + dag.numCompletedVertices
+            + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+            + ", numFailedVertices=" + dag.numFailedVertices
+            + ", numKilledVertices=" + dag.numKilledVertices
+            + ", numVertices=" + dag.numVertices
+            + ", terminationCause=" + dag.terminationCause);
+      }
     }
 
     //return the current state, Job not finished yet
@@ -745,6 +778,29 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       readLock.unlock();
     }
   }
+  
+  /**
+   * Set the terminationCause if it had not yet been set.
+   * 
+   * @param trigger The trigger
+   * @return true if setting the value succeeded.
+   */
+  boolean trySetTerminationCause(DAGTerminationCause trigger) {
+    if(terminationCause == null){
+      terminationCause = trigger;
+      return true;
+    }
+    return false;
+  }
+  
+  DAGTerminationCause getTerminationCause() {
+    readLock.lock();
+    try {
+      return terminationCause;
+    } finally {
+      readLock.unlock();
+    }
+  }
 
   /*
    * (non-Javadoc)
@@ -755,7 +811,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     // TODO ApplicationACLs
     return null;
   }
-
+  
   // TODO Recovery
   /*
   @Override
@@ -786,6 +842,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.numVertices = dag.getJobPlan().getVertexCount();
         if (dag.numVertices == 0) {
           dag.addDiagnostic("No vertices for dag");
+          dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
           dag.abortJob(DAGStatus.State.FAILED);
           return dag.finished(DAGState.FAILED);
         }
@@ -817,6 +874,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         LOG.warn("Job init failed", e);
         dag.addDiagnostic("Job init failed : "
             + StringUtils.stringifyException(e));
+        dag.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
         dag.abortJob(DAGStatus.State.FAILED);
         // TODO Metrics
         //dag.metrics.endPreparingJob(dag);
@@ -1008,6 +1066,23 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       this.fullCounters.incrAllCounters(v.getAllCounters());
     }
   }
+  
+  /**
+   * Set the terminationCause and send a kill-message to all vertices.
+   * The vertex-kill messages are only sent once. 
+   * @param the trigger that is causing the DAG to transition to KILLED/FAILED
+   * @param event The type of kill event to send to the vertices.
+   */
+  void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
+    
+    if(trySetTerminationCause(dagTerminationCause)){
+      for (Vertex v : vertices.values()) {
+        eventHandler.handle(
+            new VertexEventTermination(v.getVertexId(), vertexTerminationCause)
+            );
+      }
+    }
+  }
 
   // Task-start has been moved out of InitTransition, so this arc simply
   // hardcodes 0 for both map and reduce finished tasks.
@@ -1017,6 +1092,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     public void transition(DAGImpl job, DAGEvent event) {
       job.setFinishTime();
       job.logJobHistoryUnsuccesfulEvent(DAGStatus.State.KILLED);
+      job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
       job.finished(DAGState.KILLED);
     }
   }
@@ -1025,25 +1101,24 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   implements SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
     public void transition(DAGImpl job, DAGEvent event) {
+      job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
       job.abortJob(DAGStatus.State.KILLED);
       job.addDiagnostic("Job received Kill in INITED state.");
       job.finished(DAGState.KILLED);
     }
   }
 
-  private static class KillVerticesTransition
+  private static class DAGKilledTransition
       implements SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
     public void transition(DAGImpl job, DAGEvent event) {
       job.addDiagnostic("Job received Kill while in RUNNING state.");
-      for (Vertex v : job.vertices.values()) {
-        job.eventHandler.handle(
-            new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)
-            );
-      }
+      job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
       // TODO Metrics
       //job.metrics.endRunningJob(job);
     }
+
+   
   }
 
   private static class VertexCompletedTransition implements
@@ -1051,7 +1126,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     @Override
     public DAGState transition(DAGImpl job, DAGEvent event) {
-
+      boolean forceTransitionToKillWait = false;
+      
       DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event;
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received a vertex completion event"
@@ -1063,10 +1139,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
         vertexSucceeded(job, vertex);
         job.dagScheduler.vertexCompleted(vertex);
-      } else if (vertexEvent.getVertexState() == VertexState.FAILED) {
+      } 
+      else if (vertexEvent.getVertexState() == VertexState.FAILED) {
+        job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
         vertexFailed(job, vertex);
-      } else if (vertexEvent.getVertexState() == VertexState.KILLED) {
+        forceTransitionToKillWait = true;
+      } 
+      else if (vertexEvent.getVertexState() == VertexState.KILLED) {
         vertexKilled(job, vertex);
+        forceTransitionToKillWait = true;
       }
 
       if (LOG.isDebugEnabled()) {
@@ -1078,7 +1159,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             + ", numVertices=" + job.numVertices);
       }
 
-      return checkJobForCompletion(job);
+      // if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT.
+      DAGState state = checkJobForCompletion(job);
+      if(state == DAGState.RUNNING && forceTransitionToKillWait){
+        return DAGState.TERMINATING;
+      }
+      else {
+        return state;
+      }
     }
 
     private void vertexSucceeded(DAGImpl job, Vertex vertex) {
@@ -1106,6 +1194,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     diagnostics.add(diag);
   }
 
+  
+ 
+
   private static class DiagnosticsUpdateTransition implements
       SingleArcTransition<DAGImpl, DAGEvent> {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2d81451..4aa0381 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -131,14 +132,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
-        TaskEventType.T_KILL, new KillNewTransition())
+            TaskEventType.T_TERMINATE, 
+            new KillNewTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
          TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
-         TaskEventType.T_KILL, KILL_TRANSITION)
+         TaskEventType.T_TERMINATE,
+             KILL_TRANSITION)
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
      .addTransition(TaskStateInternal.SCHEDULED,
@@ -169,7 +172,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
-        TaskEventType.T_KILL, KILL_TRANSITION)
+        TaskEventType.T_TERMINATE, 
+        KILL_TRANSITION)
 
     // Transitions from KILL_WAIT state
     .addTransition(TaskStateInternal.KILL_WAIT,
@@ -180,7 +184,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(
         TaskStateInternal.KILL_WAIT,
         TaskStateInternal.KILL_WAIT,
-        EnumSet.of(TaskEventType.T_KILL,
+        EnumSet.of(
+            TaskEventType.T_TERMINATE,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
@@ -205,13 +210,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
     // Transitions from FAILED state
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
-        EnumSet.of(TaskEventType.T_KILL,
-                   TaskEventType.T_ADD_SPEC_ATTEMPT))
+        EnumSet.of(
+            TaskEventType.T_TERMINATE,
+            TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from KILLED state
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
-        EnumSet.of(TaskEventType.T_KILL,
-                   TaskEventType.T_ADD_SPEC_ATTEMPT))
+        EnumSet.of(
+            TaskEventType.T_TERMINATE,
+            TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // create the topology tables
     .installTopology();
@@ -687,7 +694,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         LOG.info(taskId + " Task Transitioned from " + oldState + " to "
             + getInternalState());
       }
-
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a912876..b8392ed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1,5 +1,4 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
+/* Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
@@ -76,7 +75,9 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -86,6 +87,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
@@ -93,6 +95,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
@@ -194,8 +197,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_INIT,
               new InitTransition())
           .addTransition(VertexState.NEW, VertexState.KILLED,
-              VertexEventType.V_KILL,
-              new KillNewVertexTransition())
+              VertexEventType.V_TERMINATE,
+              new TerminateNewVertexTransition())
           .addTransition(VertexState.NEW, VertexState.ERROR,
               VertexEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
@@ -209,8 +212,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               new StartTransition())
 
           .addTransition(VertexState.INITED, VertexState.KILLED,
-              VertexEventType.V_KILL,
-              new KillInitedVertexTransition())
+              VertexEventType.V_TERMINATE,
+              new TerminateInitedVertexTransition())
           .addTransition(VertexState.INITED, VertexState.ERROR,
               VertexEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
@@ -225,11 +228,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition
               (VertexState.RUNNING,
               EnumSet.of(VertexState.RUNNING,
-                  VertexState.SUCCEEDED, VertexState.FAILED),
+                  VertexState.SUCCEEDED, VertexState.TERMINATING, VertexState.FAILED),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
-          .addTransition(VertexState.RUNNING, VertexState.KILL_WAIT,
-              VertexEventType.V_KILL, new KillTasksTransition())
+          .addTransition(VertexState.RUNNING, VertexState.TERMINATING,
+              VertexEventType.V_TERMINATE,
+              new VertexKilledTransition())
           .addTransition(VertexState.RUNNING, VertexState.RUNNING,
               VertexEventType.V_TASK_RESCHEDULED,
               new TaskRescheduledTransition())
@@ -241,25 +245,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from KILL_WAIT state.
+          // Transitions from TERMINATING state.
           .addTransition
-              (VertexState.KILL_WAIT,
-              EnumSet.of(VertexState.KILL_WAIT, VertexState.KILLED),
+              (VertexState.TERMINATING,
+              EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
-          .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
+          .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               VertexEventType.V_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) // TODO shouldnt be done for KILL_WAIT vertex
-          .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
+          .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition(
-              VertexState.KILL_WAIT,
+              VertexState.TERMINATING,
               VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
-              EnumSet.of(VertexEventType.V_KILL,
+          .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
+              EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE))
 
@@ -270,7 +274,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
-              EnumSet.of(VertexEventType.V_KILL,
+              EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED))
@@ -282,7 +286,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(VertexState.FAILED, VertexState.FAILED,
-              EnumSet.of(VertexEventType.V_KILL,
+              EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED))
@@ -294,7 +298,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(VertexState.KILLED, VertexState.KILLED,
-              EnumSet.of(VertexEventType.V_KILL,
+              EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED))
@@ -304,7 +308,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.ERROR,
               VertexState.ERROR,
               EnumSet.of(VertexEventType.V_INIT,
-                  VertexEventType.V_KILL,
+                  VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_RESCHEDULED,
@@ -349,6 +353,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
   private final String javaOpts;
+  private VertexTerminationCause terminationCause; 
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, TezConfiguration conf, EventHandler eventHandler,
@@ -607,6 +612,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       readLock.unlock();
     }
   }
+  
+  /**
+   * Set the terminationCause if it had not yet been set.
+   * 
+   * @param trigger The trigger
+   * @return true if setting the value succeeded.
+   */
+  boolean trySetTerminationCause(VertexTerminationCause trigger) {
+    if(terminationCause == null){
+      terminationCause = trigger;
+      return true;
+    }
+    return false;
+  }
+  
+  public VertexTerminationCause getTerminationCause(){
+    readLock.lock();
+    try {
+      return terminationCause;
+    } finally {
+      readLock.unlock();
+    }
+  }
 
   @Override
   public void scheduleTasks(Collection<TezTaskID> taskIDs) {
@@ -641,7 +669,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         eventHandler.handle(new VertexEvent(this.vertexId,
             VertexEventType.INTERNAL_ERROR));
       }
-      //notify the eventhandler of state change
+
       if (oldState != getInternalState()) {
         LOG.info(vertexId + " transitioned from " + oldState + " to "
                  + getInternalState());
@@ -715,57 +743,109 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   static VertexState checkVertexForCompletion(VertexImpl vertex) {
-    //check for vertex failure first
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Checking for vertex completion"
           + ", failedTaskCount=" + vertex.failedTaskCount
           + ", killedTaskCount=" + vertex.killedTaskCount
           + ", successfulTaskCount=" + vertex.succeededTaskCount
-          + ", completedTaskCount=" + vertex.completedTaskCount);
+          + ", completedTaskCount=" + vertex.completedTaskCount
+          + ", terminationCause=" + vertex.terminationCause);
     }
 
-    if (vertex.failedTaskCount > 0) {
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed as tasks failed. "
-          + "failedTasks:"
-          + vertex.failedTaskCount;
-      LOG.info(diagnosticMsg);
-      vertex.addDiagnostic(diagnosticMsg);
-      vertex.abortVertex(VertexStatus.State.FAILED);
-      return vertex.finished(VertexState.FAILED);
+    //check for vertex failure first
+    if (vertex.completedTaskCount > vertex.tasks.size()) {
+      LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
+          + ", failedTaskCount=" + vertex.failedTaskCount
+          + ", killedTaskCount=" + vertex.killedTaskCount
+          + ", successfulTaskCount=" + vertex.succeededTaskCount
+          + ", completedTaskCount=" + vertex.completedTaskCount
+          + ", terminationCause=" + vertex.terminationCause);
     }
-
-    if(vertex.succeededTaskCount == vertex.tasks.size()) {
-      try {
-        if (!vertex.committed.getAndSet(true)) {
-          // commit only once
-          vertex.committer.commitVertex();
+    
+    if (vertex.completedTaskCount == vertex.tasks.size()) {
+      //Only succeed if tasks complete successfully and no terminationCause is registered.
+      if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
+        try {
+          if (!vertex.committed.getAndSet(true)) {
+            // commit only once
+            vertex.committer.commitVertex();
+          }
+        } catch (IOException e) {
+          LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
+          vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
+          return vertex.finished(VertexState.FAILED);
         }
-      } catch (IOException e) {
-        LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
+        return vertex.finished(VertexState.SUCCEEDED);
+      }
+      else if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex killed due to user-initiated job kill. "
+            + "failedTasks:"
+            + vertex.failedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.addDiagnostic(diagnosticMsg);
+        vertex.abortVertex(VertexStatus.State.KILLED);
+        return vertex.finished(VertexState.KILLED);
+      }
+      else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex killed as other vertex failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.addDiagnostic(diagnosticMsg);
+        vertex.abortVertex(VertexStatus.State.KILLED);
+        return vertex.finished(VertexState.KILLED);
+      }
+      else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){
+        if(vertex.failedTaskCount == 0){
+          LOG.error("task failure accounting error.  terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0");
+        }
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex killed as one or more tasks failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.addDiagnostic(diagnosticMsg);
+        vertex.abortVertex(VertexStatus.State.FAILED);
         return vertex.finished(VertexState.FAILED);
       }
-      return vertex.finished(VertexState.SUCCEEDED);
-    }
-
-    if (vertex.completedTaskCount == vertex.tasks.size()) {
-      // this means the vertex has some killed tasks
-      assert vertex.killedTaskCount > 0;
-      vertex.setFinishTime();
-      vertex.abortVertex(VertexStatus.State.KILLED);
-      return vertex.finished(VertexState.KILLED);
+      else {
+        //should never occur
+        throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex"
+            + ", failedTaskCount=" + vertex.failedTaskCount
+            + ", killedTaskCount=" + vertex.killedTaskCount
+            + ", successfulTaskCount=" + vertex.succeededTaskCount
+            + ", completedTaskCount=" + vertex.completedTaskCount
+            + ", terminationCause=" + vertex.terminationCause);
+      }
     }
 
     //return the current state, Vertex not finished yet
     return vertex.getInternalState();
   }
 
+  /**
+   * Set the terminationCause and send a kill-message to all tasks.
+   * The task-kill messages are only sent once. 
+   * @param the trigger that is causing the Vertex to transition to KILLED/FAILED
+   * @param event The type of kill event to send to the vertices.
+   */
+  void enactKill(VertexTerminationCause trigger, TaskTerminationCause taskterminationCause) {
+    if(trySetTerminationCause(trigger)){
+      for (Task task : tasks.values()) {
+        eventHandler.handle(
+            new TaskEventTermination(task.getTaskId(), taskterminationCause));
+      }
+    }
+  }
+  
   VertexState finished(VertexState finalState) {
     if (finishTime == 0) setFinishTime();
 
     switch (finalState) {
       case KILLED:
-      case KILL_WAIT:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
             finalState));
         logJobHistoryVertexFailedEvent(VertexStatus.State.KILLED);
@@ -810,6 +890,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         if (vertex.numTasks == 0) {
           vertex.addDiagnostic("No tasks for vertex " + vertex.getVertexId());
+          vertex.trySetTerminationCause(VertexTerminationCause.ZERO_TASKS);
           vertex.abortVertex(VertexStatus.State.FAILED);
           return vertex.finished(VertexState.FAILED);
         }
@@ -871,6 +952,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.warn("Vertex init failed", e);
         vertex.addDiagnostic("Job init failed : "
             + StringUtils.stringifyException(e));
+        vertex.trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
         vertex.abortVertex(VertexStatus.State.FAILED);
         // TODO: Metrics
         //job.metrics.endPreparingJob(vertex);
@@ -1013,35 +1095,45 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   // Task-start has been moved out of InitTransition, so this arc simply
   // hardcodes 0 for both map and reduce finished tasks.
-  private static class KillNewVertexTransition
+  private static class TerminateNewVertexTransition
   implements SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventTermination vet = (VertexEventTermination) event;
+      vertex.trySetTerminationCause(vet.getTerminationCause());
       vertex.setFinishTime();
       vertex.addDiagnostic("Vertex received Kill in NEW state.");
       vertex.finished(VertexState.KILLED);
     }
   }
 
-  private static class KillInitedVertexTransition
+  private static class TerminateInitedVertexTransition
   implements SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventTermination vet = (VertexEventTermination) event;
+      vertex.trySetTerminationCause(vet.getTerminationCause());
       vertex.abortVertex(VertexStatus.State.KILLED);
       vertex.addDiagnostic("Vertex received Kill in INITED state.");
       vertex.finished(VertexState.KILLED);
     }
   }
 
-  private static class KillTasksTransition
+  private static class VertexKilledTransition
       implements SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       vertex.addDiagnostic("Vertex received Kill while in RUNNING state.");
-      for (Task task : vertex.tasks.values()) {
-        vertex.eventHandler.handle(
-            new TaskEvent(task.getTaskId(), TaskEventType.T_KILL));
+      VertexEventTermination vet = (VertexEventTermination) event;
+      VertexTerminationCause trigger = vet.getTerminationCause();
+      switch(trigger){
+        case DAG_KILL : vertex.enactKill(trigger, TaskTerminationCause.DAG_KILL); break;
+        case OTHER_VERTEX_FAILURE: vertex.enactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
+        case OWN_TASK_FAILURE: vertex.enactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break;
+        default://should not occur
+          throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger);
       }
+
       // TODO: Metrics
       //job.metrics.endRunningJob(job);
     }
@@ -1153,6 +1245,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      boolean forceTransitionToKillWait = false;
       vertex.completedTaskCount++;
       LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
@@ -1160,6 +1253,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
         taskSucceeded(vertex, task);
       } else if (taskEvent.getState() == TaskState.FAILED) {
+        vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
+        forceTransitionToKillWait = true;
         taskFailed(vertex, task);
       } else if (taskEvent.getState() == TaskState.KILLED) {
         taskKilled(vertex, task);
@@ -1167,6 +1262,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
       vertex.vertexScheduler.onVertexCompleted();
       VertexState state = VertexImpl.checkVertexForCompletion(vertex);
+      if(state == VertexState.RUNNING && forceTransitionToKillWait){
+        return VertexState.TERMINATING;
+      }
+      
       if(state == VertexState.SUCCEEDED) {
         vertex.vertexScheduler.onVertexCompleted();
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 421d639..a2585c6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -44,9 +44,12 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
@@ -82,6 +85,7 @@ public class TestDAGImpl {
   private AppContext appContext;
   private ApplicationAttemptId appAttemptId;
   private DAGImpl dag;
+  private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
   private TaskAttemptListener taskAttemptListener;
@@ -120,6 +124,18 @@ public class TestDAGImpl {
     }
   }
 
+  private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(TaskEvent event) {
+      DAGImpl handler = event.getTaskID().getVertexID().getDAGId().equals(dagId) ?
+          dag : mrrDag;
+      Vertex vertex = handler.getVertex(event.getTaskID().getVertexID());
+      Task task = vertex.getTask(event.getTaskID());
+      ((EventHandler<TaskEvent>)task).handle(event);
+    }
+  }
+  
   private class VertexEventDispatcher
       implements EventHandler<VertexEvent> {
 
@@ -483,6 +499,8 @@ public class TestDAGImpl {
         mrrAppContext);
     doReturn(mrrDag).when(mrrAppContext).getDAG();
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
+    taskEventDispatcher = new TaskEventDispatcher();
+    dispatcher.register(TaskEventType.class, taskEventDispatcher);
     vertexEventDispatcher = new VertexEventDispatcher();
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dagEventDispatcher = new DagEventDispatcher();
@@ -613,13 +631,13 @@ public class TestDAGImpl {
     dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
     dispatcher.await();
 
-    Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+    Assert.assertEquals(DAGState.TERMINATING, dag.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
-    Assert.assertEquals(VertexState.KILL_WAIT, v1.getState());
+    Assert.assertEquals(VertexState.TERMINATING, v1.getState());
     for (int i = 2 ; i < 6; ++i ) {
       TezVertexID vId = new TezVertexID(dagId, i);
       Vertex v = dag.getVertex(vId);
-      Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+      Assert.assertEquals(VertexState.KILLED, v.getState());
     }
     Assert.assertEquals(1, dag.getSuccessfulVertices());
   }
@@ -666,7 +684,6 @@ public class TestDAGImpl {
 
   @SuppressWarnings("unchecked")
   @Test
-  @Ignore
   public void testVertexFailureHandling() {
     initDAG(dag);
     startDAG(dag);
@@ -689,13 +706,19 @@ public class TestDAGImpl {
     for (int i = 3; i < 6; ++i) {
       TezVertexID vId = new TezVertexID(dagId, i);
       Vertex v = dag.getVertex(vId);
-      Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+      Assert.assertEquals(VertexState.KILLED, v.getState());
     }
   }
 
+  // a dag.kill() on an active DAG races with vertices all succeeding.
+  // if a JOB_KILL is processed while dag is in running state, it should end in KILLED, 
+  // regardless of whether all vertices complete 
+  //
+  // Final state:
+  //   DAG is in KILLED state, with killTrigger = USER_KILL
+  //   Each vertex had kill triggered but raced ahead and ends in SUCCEEDED state.
   @SuppressWarnings("unchecked")
   @Test
-  @Ignore
   public void testDAGKill() {
     initDAG(dag);
     startDAG(dag);
@@ -708,19 +731,23 @@ public class TestDAGImpl {
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
-    dispatcher.getEventHandler().handle(
-        new DAGEvent(dagId, DAGEventType.DAG_KILL));
-
+    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
     for (int i = 2; i < 6; ++i) {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           new TezVertexID(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
     Assert.assertEquals(6, dag.getSuccessfulVertices());
+    for (Vertex v : dag.getVertices().values()) {
+      Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
+    }
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
+  // job kill races with most vertices succeeding and one directly killed.
+  // because the job.kill() happens before the direct kill, the vertex has kill_trigger=DAG_KILL
   @SuppressWarnings("unchecked")
   @Test
   public void testDAGKillPending() {
@@ -735,21 +762,21 @@ public class TestDAGImpl {
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
-    dispatcher.getEventHandler().handle(
-        new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
 
     for (int i = 2; i < 5; ++i) {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           new TezVertexID(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
-    Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 5), VertexState.KILLED));
     dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());
     Assert.assertEquals(5, dag.getSuccessfulVertices());
+    Assert.assertEquals(dag.getVertex(new TezVertexID(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 55d4354..6ed9e83 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -47,9 +47,11 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -128,7 +130,7 @@ public class TestTaskImpl {
   }
 
   private void killTask(TezTaskID taskId) {
-    mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+    mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
     assertTaskKillWaitState();
   }
 
@@ -193,7 +195,7 @@ public class TestTaskImpl {
   }
 
   /**
-   * {@link TaskState#KILL_WAIT}
+   * {@link TaskState#TERMINATING}
    */
   private void assertTaskKillWaitState() {
     assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 10e92d5..15c8a9b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -59,7 +59,9 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
@@ -70,6 +72,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
@@ -105,6 +108,7 @@ public class TestVertexImpl {
   private TezConfiguration conf = new TezConfiguration();
   private Map<String, EdgeProperty> edges;
 
+  private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
 
   private DagEventDispatcher dagEventDispatcher;
@@ -156,12 +160,16 @@ public class TestVertexImpl {
     }
   }
 
-  private class TaskEventHandler implements EventHandler<TaskEvent> {
+  private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskEvent event) {
+      VertexImpl vertex = vertexIdMap.get(event.getTaskID().getVertexID());
+      Task task = vertex.getTask(event.getTaskID());
+      ((EventHandler<TaskEvent>)task).handle(event);
     }
   }
-
+  
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
     public Map<DAGEventType, Integer> eventCount =
         new HashMap<DAGEventType, Integer>();
@@ -221,6 +229,7 @@ public class TestVertexImpl {
         .build();
     return dag;
   }
+ 
 
   private DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
@@ -511,13 +520,13 @@ public class TestVertexImpl {
     edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(
         dagPlan.getEdgeList());
     parseVertexEdges();
+    taskEventDispatcher = new TaskEventDispatcher();
+    dispatcher.register(TaskEventType.class, taskEventDispatcher);
     vertexEventDispatcher = new VertexEventDispatcher();
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dagEventDispatcher = new DagEventDispatcher();
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
-    dispatcher.register(HistoryEventType.class,
-        new HistoryHandler());
-    dispatcher.register(TaskEventType.class, new TaskEventHandler());
+    dispatcher.register(HistoryEventType.class, new HistoryHandler());
     dispatcher.init(conf);
     dispatcher.start();
 
@@ -558,15 +567,12 @@ public class TestVertexImpl {
   }
 
   @SuppressWarnings("unchecked")
-  private void killVertex(VertexImpl v, boolean checkKillWait) {
+  private void killVertex(VertexImpl v) {
     dispatcher.getEventHandler().handle(
-        new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
     dispatcher.await();
-    if (checkKillWait) {
-      Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
-    } else {
-      Assert.assertEquals(VertexState.KILLED, v.getState());
-    }
+    Assert.assertEquals(VertexState.KILLED, v.getState());
+    Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_KILL);
   }
 
   @SuppressWarnings("unchecked")
@@ -707,6 +713,7 @@ public class TestVertexImpl {
         new VertexEventTaskCompleted(t1, TaskState.FAILED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
     String diagnostics =
         StringUtils.join(",", v.getDiagnostics()).toLowerCase();
     Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
@@ -715,7 +722,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   public void testVertexKillDiagnostics() {
     VertexImpl v1 = vertices.get("vertex1");
-    killVertex(v1, false);
+    killVertex(v1);
     String diagnostics =
         StringUtils.join(",", v1.getDiagnostics()).toLowerCase();
     Assert.assertTrue(diagnostics.contains(
@@ -723,7 +730,7 @@ public class TestVertexImpl {
 
     VertexImpl v2 = vertices.get("vertex2");
     initVertex(v2);
-    killVertex(v2, false);
+    killVertex(v2);
     diagnostics =
         StringUtils.join(",", v2.getDiagnostics()).toLowerCase();
     LOG.info("diagnostics v2: " + diagnostics);
@@ -740,7 +747,7 @@ public class TestVertexImpl {
     initVertex(v6);
 
     startVertex(v3);
-    killVertex(v3, true);
+    killVertex(v3);
     diagnostics =
         StringUtils.join(",", v3.getDiagnostics()).toLowerCase();
     Assert.assertTrue(diagnostics.contains(
@@ -756,15 +763,15 @@ public class TestVertexImpl {
     startVertex(v);
 
     dispatcher.getEventHandler().handle(
-        new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
     dispatcher.await();
-    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+    Assert.assertEquals(VertexState.KILLED, v.getState());
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
             new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
     dispatcher.await();
-    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+    Assert.assertEquals(VertexState.KILLED, v.getState());
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
@@ -774,8 +781,7 @@ public class TestVertexImpl {
   }
 
   @SuppressWarnings("unchecked")
-  @Test(timeout = 5000)
-  @Ignore
+  @Test
   public void testVertexKill() {
     initAllVertices();
 
@@ -783,13 +789,15 @@ public class TestVertexImpl {
     startVertex(v);
 
     dispatcher.getEventHandler().handle(
-        new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
-    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+        new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.KILLED, v.getState());
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
             new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
-    Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+    dispatcher.await();
+    Assert.assertEquals(VertexState.KILLED, v.getState());
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
@@ -800,7 +808,6 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  @Ignore
   public void testKilledTasksHandling() {
     initAllVertices();
 
@@ -814,6 +821,7 @@ public class TestVertexImpl {
         new VertexEventTaskCompleted(t1, TaskState.FAILED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
     Assert.assertEquals(TaskState.KILLED, v.getTask(t2).getState());
   }
 
@@ -869,6 +877,7 @@ public class TestVertexImpl {
         new VertexEventTaskCompleted(t2, TaskState.FAILED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
     Assert.assertEquals(0, committer.commitCounter);
     Assert.assertEquals(1, committer.abortCounter);
   }
@@ -970,6 +979,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
     Assert.assertEquals(4, v6.successSourceAttemptCompletionEventNoMap.size());
     Assert.assertEquals(6, v6.getTaskAttemptCompletionEvents(0, 100).length);
+    
   }
 
   @SuppressWarnings("unchecked")
@@ -1099,6 +1109,7 @@ public class TestVertexImpl {
         new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v.getTerminationCause());
     Assert.assertEquals(1, committer.commitCounter);
 
     // FIXME need to verify whether abort needs to be called if commit fails

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 5b79d3d..51341a2 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -60,6 +60,7 @@ import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.EdgeProperty.SourceType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
 import org.apache.tez.engine.lib.input.ShuffledMergedInput;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.examples.MRRSleepJob;
@@ -173,13 +174,34 @@ public class TestMRRJobsDAGApi {
   // client.
   @Test(timeout = 60000)
   public void testMRRSleepJobDagSubmit() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(false);
+    
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+  
+  // Submits a simple 3 stage sleep job using the DAG submit API instead of job
+  // client.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmitAndKill() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(true);
+    
+    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+  
+  public State testMRRSleepJobDagSubmitCore(boolean killDagWhileRunning) throws IOException,
       InterruptedException, TezException, ClassNotFoundException, YarnException {
     LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
 
     if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
           + " not found. Not running test.");
-      return;
+      return State.ERROR;
     }
 
     JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
@@ -368,6 +390,7 @@ public class TestMRRJobsDAGApi {
         null, "default", Collections.singletonList(""), commonEnv,
         amLocalResources, new TezConfiguration());
 
+    
     DAGStatus dagStatus = dagClient.getDAGStatus();
     while (!dagStatus.isCompleted()) {
       LOG.info("Waiting for job to complete. Sleeping for 500ms. Current state: "
@@ -375,12 +398,13 @@ public class TestMRRJobsDAGApi {
       // TODO The test will fail if the AM sleep is removed. TEZ-207 to fix
       // this.
       Thread.sleep(500l);
+      if(killDagWhileRunning && dagStatus.getState() == DAGStatus.State.RUNNING){
+        dagClient.tryKillDAG();
+        dagStatus = dagClient.getDAGStatus();
+      }
       dagStatus = dagClient.getDAGStatus();
     }
-
-    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    // TODO Add additional checks for tracking URL etc. - once it's exposed by
-    // the DAG API.
+    return dagStatus.getState();
   }
 
   private static LocalResource createLocalResource(FileSystem fc, Path file,