You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/07/09 02:48:50 UTC
[2/2] git commit: TEZ-141; TEZ-143; TEZ-148; TEZ-283;
TEZ-284: Improve failure/kill handling, track TerminationCauses, add
dag.kill()
TEZ-141;TEZ-143;TEZ-148;TEZ-283;TEZ-284: Improve failure/kill handling, track TerminationCauses, add dag.kill()
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/999d16ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/999d16ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/999d16ae
Branch: refs/heads/master
Commit: 999d16ae1022884db4a17e2b803abe52ea82fcf0
Parents: 92228ba
Author: Mike Liddell <ml...@apache.org>
Authored: Mon Jul 8 17:48:18 2013 -0700
Committer: Mike Liddell <ml...@apache.org>
Committed: Mon Jul 8 17:48:18 2013 -0700
----------------------------------------------------------------------
.../apache/tez/dag/api/client/DAGClient.java | 7 +
.../apache/tez/dag/api/client/DAGStatus.java | 3 +-
.../apache/tez/dag/api/client/VertexStatus.java | 5 +-
.../dag/api/client/rpc/DAGClientRPCImpl.java | 18 ++
tez-dag-api/src/main/proto/DAGApiRecords.proto | 2 +
.../src/main/proto/DAGClientAMProtocol.proto | 10 +
.../tez/dag/api/client/DAGStatusBuilder.java | 3 +-
.../tez/dag/api/client/VertexStatusBuilder.java | 3 +-
...DAGClientAMProtocolBlockingPBServerImpl.java | 15 ++
.../org/apache/tez/dag/app/DAGAppMaster.java | 8 +-
.../java/org/apache/tez/dag/app/dag/DAG.java | 4 +-
.../org/apache/tez/dag/app/dag/DAGState.java | 2 +-
.../tez/dag/app/dag/DAGTerminationCause.java | 38 ++++
.../tez/dag/app/dag/TaskTerminationCause.java | 36 ++++
.../java/org/apache/tez/dag/app/dag/Vertex.java | 1 +
.../org/apache/tez/dag/app/dag/VertexState.java | 2 +-
.../tez/dag/app/dag/VertexTerminationCause.java | 43 ++++
.../dag/app/dag/event/TaskEventTermination.java | 37 ++++
.../tez/dag/app/dag/event/TaskEventType.java | 4 +-
.../app/dag/event/VertexEventTermination.java | 43 ++++
.../tez/dag/app/dag/event/VertexEventType.java | 3 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 187 +++++++++++-----
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 24 ++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 213 ++++++++++++++-----
.../tez/dag/app/dag/impl/TestDAGImpl.java | 51 +++--
.../tez/dag/app/dag/impl/TestTaskImpl.java | 6 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 59 ++---
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 34 ++-
.../tez/mapreduce/hadoop/TestMRHelpers.java | 12 +-
29 files changed, 699 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 7ec4e22..3f5d07a 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -57,4 +57,11 @@ public interface DAGClient extends Closeable {
*/
public VertexStatus getVertexStatus(String vertexName)
throws IOException, TezException;
+
+ /**
+ * Kill a running DAG
+ *
+ */
+ public void tryKillDAG()
+ throws TezException, IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 8249626..c28f2bf 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -35,7 +35,7 @@ public class DAGStatus {
SUCCEEDED,
KILLED,
FAILED,
- ERROR
+ ERROR,
};
DAGStatusProtoOrBuilder proxy = null;
@@ -52,6 +52,7 @@ public class DAGStatus {
return DAGStatus.State.SUBMITTED;
case DAG_INITING:
return DAGStatus.State.INITING;
+ case DAG_TERMINATING: // For simplicity, DAG_TERMINATING is presented to user as 'still running'
case DAG_RUNNING:
return DAGStatus.State.RUNNING;
case DAG_SUCCEEDED:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index 185bc43..ce5dbe0 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -31,7 +31,8 @@ public class VertexStatus {
SUCCEEDED,
KILLED,
FAILED,
- ERROR
+ ERROR,
+ TERMINATING,
};
VertexStatusProtoOrBuilder proxy = null;
@@ -55,6 +56,8 @@ public class VertexStatus {
return VertexStatus.State.KILLED;
case VERTEX_ERROR:
return VertexStatus.State.ERROR;
+ case VERTEX_TERMINATING:
+ return VertexStatus.State.TERMINATING;
default:
throw new TezUncheckedException("Unsupported value for VertexStatus.State : " +
proxy.getState());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 4a4163c..b405de3 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
@@ -98,6 +99,21 @@ public class DAGClientRPCImpl implements DAGClient {
// need AM for this. Later maybe from History
return null;
}
+
+ public void tryKillDAG() throws TezException, IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
+ }
+ if(createAMProxyIfNeeded()) {
+ TryKillDAGRequestProto requestProto =
+ TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
+ try {
+ proxy.tryKillDAG(null, requestProto);
+ } catch (ServiceException e) {
+ resetProxy(e);
+ }
+ }
+ }
@Override
public void close() throws IOException {
@@ -137,6 +153,8 @@ public class DAGClientRPCImpl implements DAGClient {
}
}
+
+
DAGStatus getDAGStatusViaRM() throws TezException, IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGApiRecords.proto b/tez-dag-api/src/main/proto/DAGApiRecords.proto
index b062c15..b5fc5b7 100644
--- a/tez-dag-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-dag-api/src/main/proto/DAGApiRecords.proto
@@ -141,6 +141,7 @@ enum VertexStatusStateProto {
VERTEX_KILLED = 3;
VERTEX_FAILED = 4;
VERTEX_ERROR = 5;
+ VERTEX_TERMINATING = 6;
}
message VertexStatusProto {
@@ -157,6 +158,7 @@ enum DAGStatusStateProto {
DAG_KILLED = 4;
DAG_FAILED = 5;
DAG_ERROR = 6;
+ DAG_TERMINATING = 7;
}
message StringProgressPairProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
index 18247a1..dd8f3d5 100644
--- a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
@@ -49,8 +49,18 @@ message GetVertexStatusResponseProto {
optional VertexStatusProto vertexStatus = 1;
}
+message TryKillDAGRequestProto {
+ optional string dagId = 1;
+}
+
+message TryKillDAGResponseProto {
+ //nothing yet
+}
+
+
service DAGClientAMProtocol {
rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
rpc getVertexStatus (GetVertexStatusRequestProto) returns (GetVertexStatusResponseProto);
+ rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
index 3247d43..2b0f543 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
@@ -71,8 +71,9 @@ public class DAGStatusBuilder extends DAGStatus {
case FAILED:
return DAGStatusStateProto.DAG_FAILED;
case KILLED:
- case KILL_WAIT:
return DAGStatusStateProto.DAG_KILLED;
+ case TERMINATING:
+ return DAGStatusStateProto.DAG_TERMINATING;
case ERROR:
return DAGStatusStateProto.DAG_ERROR;
default:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 606d8da..66de71f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -63,8 +63,9 @@ public class VertexStatusBuilder extends VertexStatus {
case FAILED:
return VertexStatusStateProto.VERTEX_FAILED;
case KILLED:
- case KILL_WAIT:
return VertexStatusStateProto.VERTEX_KILLED;
+ case TERMINATING:
+ return VertexStatusStateProto.VERTEX_TERMINATING;
case ERROR:
return VertexStatusStateProto.VERTEX_ERROR;
default:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 2b9c723..1d12ffe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -32,6 +32,8 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequ
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGResponseProto;
import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
import com.google.protobuf.RpcController;
@@ -89,6 +91,19 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
}
}
+ @Override
+ public TryKillDAGResponseProto tryKillDAG(RpcController controller,
+ TryKillDAGRequestProto request) throws ServiceException {
+ try {
+ String dagId = request.getDagId();
+ real.tryKillDAG(dagId);
+ return TryKillDAGResponseProto.newBuilder().build();
+ } catch (TezException e) {
+ throw wrapException(e);
+ }
+ }
+
+
ServiceException wrapException(Exception e){
return new ServiceException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index bc14f90..c276e4f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -695,7 +695,7 @@ public class DAGAppMaster extends CompositeService {
state = DAGAppMasterState.FAILED;
break;
case KILLED:
- state = DAGAppMasterState.FAILED;
+ state = DAGAppMasterState.KILLED;
case ERROR:
state = DAGAppMasterState.ERROR;
default:
@@ -736,6 +736,12 @@ public class DAGAppMaster extends CompositeService {
}
return dag;
}
+
+ public void tryKillDAG(String dagIdStr)
+ throws TezException {
+ DAG dag = getDAG(dagIdStr);
+ dag.tryKill();
+ }
}
private class RunningAppContext implements AppContext {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 913b565..144dd24 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -32,7 +32,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
/**
- * Main interface to interact with the job. Provides only getters.
+ * Main interface to interact with the job.
*/
public interface DAG {
@@ -77,5 +77,5 @@ public interface DAG {
//List<AMInfo> getAMInfos();
boolean checkAccess(UserGroupInformation callerUGI, ApplicationAccessType jobOperation);
-
+ void tryKill();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
index a7de770..b83e9a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
@@ -25,5 +25,5 @@ public enum DAGState {
FAILED,
KILLED,
ERROR,
- KILL_WAIT
+ TERMINATING
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
new file mode 100644
index 0000000..05f15f3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+
+/**
+ * Represents proximate cause of a DAG transition to FAILED or KILLED.
+ */
+public enum DAGTerminationCause {
+
+ /** DAG was directly killed. */
+ DAG_KILL,
+
+ /** A vertex failed. */
+ VERTEX_FAILURE,
+
+ /** DAG failed due as it had zero vertices. */
+ ZERO_VERTICES,
+
+ /** DAG failed during init. */
+ INIT_FAILURE,
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
new file mode 100644
index 0000000..73741f0
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+
+/**
+ * Represents proximate cause of a Task transition to FAILED or KILLED.
+ */
+public enum TaskTerminationCause {
+
+ /** DAG was killed */
+ DAG_KILL,
+
+ /** Other vertex failed causing DAG to fail thus killing the parent vertex */
+ OTHER_VERTEX_FAILURE,
+
+ /** One of the tasks for the parent vertex failed. */
+ OTHER_TASK_FAILURE,
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f55dff5..9c4d3b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -81,4 +81,5 @@ public interface Vertex extends Comparable<Vertex> {
Resource getTaskResource();
public DAG getDAG();
+ VertexTerminationCause getTerminationCause();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 9b0e19a..887c0d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -25,5 +25,5 @@ public enum VertexState {
FAILED,
KILLED,
ERROR,
- KILL_WAIT,
+ TERMINATING,
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
new file mode 100644
index 0000000..138ee70
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+/**
+ * Represents proximate cause of transition to FAILED or KILLED.
+ */
+public enum VertexTerminationCause {
+
+ /** DAG was killed */
+ DAG_KILL,
+
+ /** Other vertex failed causing DAG to fail thus killing this vertex */
+ OTHER_VERTEX_FAILURE,
+
+ /** One of the tasks for this vertex failed. */
+ OWN_TASK_FAILURE,
+
+ /** This vertex failed during commit. */
+ COMMIT_FAILURE,
+
+ /** This vertex failed as it had zero tasks. */
+ ZERO_TASKS,
+
+ /** This vertex failed during init. */
+ INIT_FAILURE
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
new file mode 100644
index 0000000..130e86f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.TaskTerminationCause;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class TaskEventTermination extends TaskEvent {
+
+ private TaskTerminationCause terminationCause;
+
+ public TaskEventTermination(TezTaskID taskID, TaskTerminationCause terminationCause) {
+ super(taskID, TaskEventType.T_TERMINATE);
+ this.terminationCause = terminationCause;
+ }
+
+ public TaskTerminationCause getTerminationCause() {
+ return terminationCause;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index d2e492e..bc7f3ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -24,8 +24,8 @@ package org.apache.tez.dag.app.dag.event;
public enum TaskEventType {
//Producer:Client, Job
- T_KILL,
-
+ T_TERMINATE,
+
//Producer:Job
T_SCHEDULE,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java
new file mode 100644
index 0000000..647f600
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTermination.java
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.records.TezVertexID;
+
+/**
+ * Specialized VertexEvent for termination conditions.
+ *
+ * The VertexEventType is always V_TERMINATE.
+ * The terminationCause denotes the specific reason why the vertex is being terminated.
+ */
+public class VertexEventTermination extends VertexEvent {
+
+ private VertexTerminationCause terminationCause;
+
+ public VertexEventTermination(TezVertexID vertexId, VertexTerminationCause terminationCause) {
+ super(vertexId, VertexEventType.V_TERMINATE);
+ this.terminationCause = terminationCause;
+ }
+
+ public VertexTerminationCause getTerminationCause() {
+ return terminationCause;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 7e874d0..43cffe6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -24,7 +24,7 @@ package org.apache.tez.dag.app.dag.event;
public enum VertexEventType {
//Producer:Client, Job
- V_KILL,
+ V_TERMINATE,
//Producer:Job
V_INIT,
@@ -56,4 +56,5 @@ public enum VertexEventType {
V_DIAGNOSTIC_UPDATE,
INTERNAL_ERROR,
V_COUNTER_UPDATE,
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 9e4f00b..be25c66 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -65,10 +65,12 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
@@ -78,8 +80,11 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
@@ -194,11 +199,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// Transitions from RUNNING state
.addTransition
(DAGState.RUNNING,
- EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED),
+ EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED),
DAGEventType.DAG_VERTEX_COMPLETED,
new VertexCompletedTransition())
- .addTransition(DAGState.RUNNING, DAGState.KILL_WAIT,
- DAGEventType.DAG_KILL, new KillVerticesTransition())
+ .addTransition(DAGState.RUNNING, DAGState.TERMINATING,
+ DAGEventType.DAG_KILL, new DAGKilledTransition())
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
@@ -212,21 +217,25 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- // Transitions from KILL_WAIT state.
+ // Transitions from TERMINATING state.
.addTransition
- (DAGState.KILL_WAIT,
- EnumSet.of(DAGState.KILL_WAIT, DAGState.KILLED),
+ (DAGState.TERMINATING,
+ EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED),
DAGEventType.DAG_VERTEX_COMPLETED,
new VertexCompletedTransition())
- .addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
+ .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
+ .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
- DAGState.KILL_WAIT,
+ DAGState.TERMINATING,
DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+
+ // Ignore-able events
+ .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
+ EnumSet.of(DAGEventType.DAG_KILL))
// Transitions from SUCCEEDED state
.addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
@@ -296,7 +305,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private int numFailedVertices = 0;
private int numKilledVertices = 0;
private boolean isUber = false;
-
+ private DAGTerminationCause terminationCause;
private Credentials fsTokens;
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
@@ -484,6 +493,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
this.readLock.unlock();
}
}
+
+ @Override
+ public void tryKill() {
+ //send a DAG_KILL message
+ eventHandler.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ }
@Override
public Map<TezVertexID, Vertex> getVertices() {
@@ -647,37 +662,55 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+ ", numFailedVertices=" + dag.numFailedVertices
+ ", numKilledVertices=" + dag.numKilledVertices
- + ", numVertices=" + dag.numVertices);
- }
-
- if (dag.numFailedVertices > 0) {
- dag.setFinishTime();
- String diagnosticMsg = "DAG failed as vertices failed. " +
- " failedVertices:" + dag.numFailedVertices +
- " killedVertices:" + dag.numKilledVertices;
- LOG.info(diagnosticMsg);
- dag.addDiagnostic(diagnosticMsg);
- dag.abortJob(DAGStatus.State.FAILED);
- return dag.finished(DAGState.FAILED);
+ + ", numVertices=" + dag.numVertices
+ + ", terminationCause=" + dag.terminationCause);
}
- if(dag.numSuccessfulVertices == dag.numVertices) {
- dag.setFinishTime();
- dag.logJobHistoryFinishedEvent();
- return dag.finished(DAGState.SUCCEEDED);
+ // log in case of accounting error.
+ if (dag.numCompletedVertices > dag.numVertices) {
+ LOG.error("vertex completion accounting issue: numCompletedVertices > numVertices"
+ + ", numCompletedVertices=" + dag.numCompletedVertices
+ + ", numVertices=" + dag.numVertices
+ );
}
-
+
if (dag.numCompletedVertices == dag.numVertices) {
- // this means the dag has some killed vertices
- String diagnosticMsg = "DAG killed. " +
- " failedVertices:" + dag.numFailedVertices +
- " killedVertices:" + dag.numKilledVertices;
- LOG.info(diagnosticMsg);
- dag.addDiagnostic(diagnosticMsg);
- assert dag.numKilledVertices > 0;
- dag.setFinishTime();
- dag.abortJob(DAGStatus.State.KILLED);
- return dag.finished(DAGState.KILLED);
+ //Only succeed if vertices complete successfully and no terminationCause is registered.
+ if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
+ dag.setFinishTime();
+ dag.logJobHistoryFinishedEvent();
+ return dag.finished(DAGState.SUCCEEDED);
+ }
+ else if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
+ dag.setFinishTime();
+ String diagnosticMsg = "DAG killed due to user-initiated kill." +
+ " failedVertices:" + dag.numFailedVertices +
+ " killedVertices:" + dag.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ dag.addDiagnostic(diagnosticMsg);
+ dag.abortJob(DAGStatus.State.KILLED);
+ return dag.finished(DAGState.KILLED);
+ }
+ if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
+ dag.setFinishTime();
+ String diagnosticMsg = "DAG failed due to vertex failure." +
+ " failedVertices:" + dag.numFailedVertices +
+ " killedVertices:" + dag.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ dag.addDiagnostic(diagnosticMsg);
+ dag.abortJob(DAGStatus.State.FAILED);
+ return dag.finished(DAGState.FAILED);
+ }
+ else {
+ // should never get here.
+ throw new TezUncheckedException("All vertices complete, but cannot determine final state of DAG"
+ + ", numCompletedVertices=" + dag.numCompletedVertices
+ + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+ + ", numFailedVertices=" + dag.numFailedVertices
+ + ", numKilledVertices=" + dag.numKilledVertices
+ + ", numVertices=" + dag.numVertices
+ + ", terminationCause=" + dag.terminationCause);
+ }
}
//return the current state, Job not finished yet
@@ -745,6 +778,29 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
readLock.unlock();
}
}
+
+ /**
+ * Set the terminationCause if it had not yet been set.
+ *
+ * @param trigger The trigger
+ * @return true if setting the value succeeded.
+ */
+ boolean trySetTerminationCause(DAGTerminationCause trigger) {
+ if(terminationCause == null){
+ terminationCause = trigger;
+ return true;
+ }
+ return false;
+ }
+
+ DAGTerminationCause getTerminationCause() {
+ readLock.lock();
+ try {
+ return terminationCause;
+ } finally {
+ readLock.unlock();
+ }
+ }
/*
* (non-Javadoc)
@@ -755,7 +811,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// TODO ApplicationACLs
return null;
}
-
+
// TODO Recovery
/*
@Override
@@ -786,6 +842,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.numVertices = dag.getJobPlan().getVertexCount();
if (dag.numVertices == 0) {
dag.addDiagnostic("No vertices for dag");
+ dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
dag.abortJob(DAGStatus.State.FAILED);
return dag.finished(DAGState.FAILED);
}
@@ -817,6 +874,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
LOG.warn("Job init failed", e);
dag.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
+ dag.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
dag.abortJob(DAGStatus.State.FAILED);
// TODO Metrics
//dag.metrics.endPreparingJob(dag);
@@ -1008,6 +1066,23 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
this.fullCounters.incrAllCounters(v.getAllCounters());
}
}
+
+ /**
+ * Set the terminationCause and send a kill-message to all vertices.
+ * The vertex-kill messages are only sent once.
+ * @param the trigger that is causing the DAG to transition to KILLED/FAILED
+ * @param event The type of kill event to send to the vertices.
+ */
+ void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
+
+ if(trySetTerminationCause(dagTerminationCause)){
+ for (Vertex v : vertices.values()) {
+ eventHandler.handle(
+ new VertexEventTermination(v.getVertexId(), vertexTerminationCause)
+ );
+ }
+ }
+ }
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
@@ -1017,6 +1092,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public void transition(DAGImpl job, DAGEvent event) {
job.setFinishTime();
job.logJobHistoryUnsuccesfulEvent(DAGStatus.State.KILLED);
+ job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
job.finished(DAGState.KILLED);
}
}
@@ -1025,25 +1101,24 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
implements SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
+ job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
job.abortJob(DAGStatus.State.KILLED);
job.addDiagnostic("Job received Kill in INITED state.");
job.finished(DAGState.KILLED);
}
}
- private static class KillVerticesTransition
+ private static class DAGKilledTransition
implements SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
job.addDiagnostic("Job received Kill while in RUNNING state.");
- for (Vertex v : job.vertices.values()) {
- job.eventHandler.handle(
- new VertexEvent(v.getVertexId(), VertexEventType.V_KILL)
- );
- }
+ job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
// TODO Metrics
//job.metrics.endRunningJob(job);
}
+
+
}
private static class VertexCompletedTransition implements
@@ -1051,7 +1126,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@Override
public DAGState transition(DAGImpl job, DAGEvent event) {
-
+ boolean forceTransitionToKillWait = false;
+
DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event;
if (LOG.isDebugEnabled()) {
LOG.debug("Received a vertex completion event"
@@ -1063,10 +1139,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
vertexSucceeded(job, vertex);
job.dagScheduler.vertexCompleted(vertex);
- } else if (vertexEvent.getVertexState() == VertexState.FAILED) {
+ }
+ else if (vertexEvent.getVertexState() == VertexState.FAILED) {
+ job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
vertexFailed(job, vertex);
- } else if (vertexEvent.getVertexState() == VertexState.KILLED) {
+ forceTransitionToKillWait = true;
+ }
+ else if (vertexEvent.getVertexState() == VertexState.KILLED) {
vertexKilled(job, vertex);
+ forceTransitionToKillWait = true;
}
if (LOG.isDebugEnabled()) {
@@ -1078,7 +1159,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ ", numVertices=" + job.numVertices);
}
- return checkJobForCompletion(job);
+ // if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT.
+ DAGState state = checkJobForCompletion(job);
+ if(state == DAGState.RUNNING && forceTransitionToKillWait){
+ return DAGState.TERMINATING;
+ }
+ else {
+ return state;
+ }
}
private void vertexSucceeded(DAGImpl job, Vertex vertex) {
@@ -1106,6 +1194,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
diagnostics.add(diag);
}
+
+
+
private static class DiagnosticsUpdateTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2d81451..4aa0381 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -131,14 +132,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
- TaskEventType.T_KILL, new KillNewTransition())
+ TaskEventType.T_TERMINATE,
+ new KillNewTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
- TaskEventType.T_KILL, KILL_TRANSITION)
+ TaskEventType.T_TERMINATE,
+ KILL_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED,
@@ -169,7 +172,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition())
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
- TaskEventType.T_KILL, KILL_TRANSITION)
+ TaskEventType.T_TERMINATE,
+ KILL_TRANSITION)
// Transitions from KILL_WAIT state
.addTransition(TaskStateInternal.KILL_WAIT,
@@ -180,7 +184,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(
TaskStateInternal.KILL_WAIT,
TaskStateInternal.KILL_WAIT,
- EnumSet.of(TaskEventType.T_KILL,
+ EnumSet.of(
+ TaskEventType.T_TERMINATE,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
@@ -205,13 +210,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
- EnumSet.of(TaskEventType.T_KILL,
- TaskEventType.T_ADD_SPEC_ATTEMPT))
+ EnumSet.of(
+ TaskEventType.T_TERMINATE,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from KILLED state
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
- EnumSet.of(TaskEventType.T_KILL,
- TaskEventType.T_ADD_SPEC_ATTEMPT))
+ EnumSet.of(
+ TaskEventType.T_TERMINATE,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
// create the topology tables
.installTopology();
@@ -687,7 +694,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
LOG.info(taskId + " Task Transitioned from " + oldState + " to "
+ getInternalState());
}
-
} finally {
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a912876..b8392ed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1,5 +1,4 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
+/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -76,7 +75,9 @@ import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.VertexScheduler;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -86,6 +87,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
@@ -93,6 +95,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
@@ -194,8 +197,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_INIT,
new InitTransition())
.addTransition(VertexState.NEW, VertexState.KILLED,
- VertexEventType.V_KILL,
- new KillNewVertexTransition())
+ VertexEventType.V_TERMINATE,
+ new TerminateNewVertexTransition())
.addTransition(VertexState.NEW, VertexState.ERROR,
VertexEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
@@ -209,8 +212,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new StartTransition())
.addTransition(VertexState.INITED, VertexState.KILLED,
- VertexEventType.V_KILL,
- new KillInitedVertexTransition())
+ VertexEventType.V_TERMINATE,
+ new TerminateInitedVertexTransition())
.addTransition(VertexState.INITED, VertexState.ERROR,
VertexEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
@@ -225,11 +228,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition
(VertexState.RUNNING,
EnumSet.of(VertexState.RUNNING,
- VertexState.SUCCEEDED, VertexState.FAILED),
+ VertexState.SUCCEEDED, VertexState.TERMINATING, VertexState.FAILED),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedTransition())
- .addTransition(VertexState.RUNNING, VertexState.KILL_WAIT,
- VertexEventType.V_KILL, new KillTasksTransition())
+ .addTransition(VertexState.RUNNING, VertexState.TERMINATING,
+ VertexEventType.V_TERMINATE,
+ new VertexKilledTransition())
.addTransition(VertexState.RUNNING, VertexState.RUNNING,
VertexEventType.V_TASK_RESCHEDULED,
new TaskRescheduledTransition())
@@ -241,25 +245,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- // Transitions from KILL_WAIT state.
+ // Transitions from TERMINATING state.
.addTransition
- (VertexState.KILL_WAIT,
- EnumSet.of(VertexState.KILL_WAIT, VertexState.KILLED),
+ (VertexState.TERMINATING,
+ EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedTransition())
- .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
+ .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) // TODO shouldnt be done for KILL_WAIT vertex
- .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
+ .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition(
- VertexState.KILL_WAIT,
+ VertexState.TERMINATING,
VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
- .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
- EnumSet.of(VertexEventType.V_KILL,
+ .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
+ EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE))
@@ -270,7 +274,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
- EnumSet.of(VertexEventType.V_KILL,
+ EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
@@ -282,7 +286,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(VertexState.FAILED, VertexState.FAILED,
- EnumSet.of(VertexEventType.V_KILL,
+ EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
@@ -294,7 +298,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(VertexState.KILLED, VertexState.KILLED,
- EnumSet.of(VertexEventType.V_KILL,
+ EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
@@ -304,7 +308,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.ERROR,
VertexState.ERROR,
EnumSet.of(VertexEventType.V_INIT,
- VertexEventType.V_KILL,
+ VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_RESCHEDULED,
@@ -349,6 +353,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Map<String, LocalResource> localResources;
private Map<String, String> environment;
private final String javaOpts;
+ private VertexTerminationCause terminationCause;
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, TezConfiguration conf, EventHandler eventHandler,
@@ -607,6 +612,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
readLock.unlock();
}
}
+
+ /**
+ * Set the terminationCause if it had not yet been set.
+ *
+ * @param trigger The trigger
+ * @return true if setting the value succeeded.
+ */
+ boolean trySetTerminationCause(VertexTerminationCause trigger) {
+ if(terminationCause == null){
+ terminationCause = trigger;
+ return true;
+ }
+ return false;
+ }
+
+ public VertexTerminationCause getTerminationCause(){
+ readLock.lock();
+ try {
+ return terminationCause;
+ } finally {
+ readLock.unlock();
+ }
+ }
@Override
public void scheduleTasks(Collection<TezTaskID> taskIDs) {
@@ -641,7 +669,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
eventHandler.handle(new VertexEvent(this.vertexId,
VertexEventType.INTERNAL_ERROR));
}
- //notify the eventhandler of state change
+
if (oldState != getInternalState()) {
LOG.info(vertexId + " transitioned from " + oldState + " to "
+ getInternalState());
@@ -715,57 +743,109 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
static VertexState checkVertexForCompletion(VertexImpl vertex) {
- //check for vertex failure first
+
if (LOG.isDebugEnabled()) {
LOG.debug("Checking for vertex completion"
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
- + ", completedTaskCount=" + vertex.completedTaskCount);
+ + ", completedTaskCount=" + vertex.completedTaskCount
+ + ", terminationCause=" + vertex.terminationCause);
}
- if (vertex.failedTaskCount > 0) {
- vertex.setFinishTime();
- String diagnosticMsg = "Vertex failed as tasks failed. "
- + "failedTasks:"
- + vertex.failedTaskCount;
- LOG.info(diagnosticMsg);
- vertex.addDiagnostic(diagnosticMsg);
- vertex.abortVertex(VertexStatus.State.FAILED);
- return vertex.finished(VertexState.FAILED);
+ //check for vertex failure first
+ if (vertex.completedTaskCount > vertex.tasks.size()) {
+ LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
+ + ", failedTaskCount=" + vertex.failedTaskCount
+ + ", killedTaskCount=" + vertex.killedTaskCount
+ + ", successfulTaskCount=" + vertex.succeededTaskCount
+ + ", completedTaskCount=" + vertex.completedTaskCount
+ + ", terminationCause=" + vertex.terminationCause);
}
-
- if(vertex.succeededTaskCount == vertex.tasks.size()) {
- try {
- if (!vertex.committed.getAndSet(true)) {
- // commit only once
- vertex.committer.commitVertex();
+
+ if (vertex.completedTaskCount == vertex.tasks.size()) {
+ //Only succeed if tasks complete successfully and no terminationCause is registered.
+ if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
+ try {
+ if (!vertex.committed.getAndSet(true)) {
+ // commit only once
+ vertex.committer.commitVertex();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
+ vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
+ return vertex.finished(VertexState.FAILED);
}
- } catch (IOException e) {
- LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
+ return vertex.finished(VertexState.SUCCEEDED);
+ }
+ else if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex killed due to user-initiated job kill. "
+ + "failedTasks:"
+ + vertex.failedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.addDiagnostic(diagnosticMsg);
+ vertex.abortVertex(VertexStatus.State.KILLED);
+ return vertex.finished(VertexState.KILLED);
+ }
+ else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex killed as other vertex failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.addDiagnostic(diagnosticMsg);
+ vertex.abortVertex(VertexStatus.State.KILLED);
+ return vertex.finished(VertexState.KILLED);
+ }
+ else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){
+ if(vertex.failedTaskCount == 0){
+ LOG.error("task failure accounting error. terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0");
+ }
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex killed as one or more tasks failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.addDiagnostic(diagnosticMsg);
+ vertex.abortVertex(VertexStatus.State.FAILED);
return vertex.finished(VertexState.FAILED);
}
- return vertex.finished(VertexState.SUCCEEDED);
- }
-
- if (vertex.completedTaskCount == vertex.tasks.size()) {
- // this means the vertex has some killed tasks
- assert vertex.killedTaskCount > 0;
- vertex.setFinishTime();
- vertex.abortVertex(VertexStatus.State.KILLED);
- return vertex.finished(VertexState.KILLED);
+ else {
+ //should never occur
+ throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex"
+ + ", failedTaskCount=" + vertex.failedTaskCount
+ + ", killedTaskCount=" + vertex.killedTaskCount
+ + ", successfulTaskCount=" + vertex.succeededTaskCount
+ + ", completedTaskCount=" + vertex.completedTaskCount
+ + ", terminationCause=" + vertex.terminationCause);
+ }
}
//return the current state, Vertex not finished yet
return vertex.getInternalState();
}
+ /**
+ * Set the terminationCause and send a kill-message to all tasks.
+ * The task-kill messages are only sent once.
+ * @param the trigger that is causing the Vertex to transition to KILLED/FAILED
+ * @param event The type of kill event to send to the vertices.
+ */
+ void enactKill(VertexTerminationCause trigger, TaskTerminationCause taskterminationCause) {
+ if(trySetTerminationCause(trigger)){
+ for (Task task : tasks.values()) {
+ eventHandler.handle(
+ new TaskEventTermination(task.getTaskId(), taskterminationCause));
+ }
+ }
+ }
+
VertexState finished(VertexState finalState) {
if (finishTime == 0) setFinishTime();
switch (finalState) {
case KILLED:
- case KILL_WAIT:
eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
finalState));
logJobHistoryVertexFailedEvent(VertexStatus.State.KILLED);
@@ -810,6 +890,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.numTasks == 0) {
vertex.addDiagnostic("No tasks for vertex " + vertex.getVertexId());
+ vertex.trySetTerminationCause(VertexTerminationCause.ZERO_TASKS);
vertex.abortVertex(VertexStatus.State.FAILED);
return vertex.finished(VertexState.FAILED);
}
@@ -871,6 +952,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.warn("Vertex init failed", e);
vertex.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
+ vertex.trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
vertex.abortVertex(VertexStatus.State.FAILED);
// TODO: Metrics
//job.metrics.endPreparingJob(vertex);
@@ -1013,35 +1095,45 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
- private static class KillNewVertexTransition
+ private static class TerminateNewVertexTransition
implements SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
+ VertexEventTermination vet = (VertexEventTermination) event;
+ vertex.trySetTerminationCause(vet.getTerminationCause());
vertex.setFinishTime();
vertex.addDiagnostic("Vertex received Kill in NEW state.");
vertex.finished(VertexState.KILLED);
}
}
- private static class KillInitedVertexTransition
+ private static class TerminateInitedVertexTransition
implements SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
+ VertexEventTermination vet = (VertexEventTermination) event;
+ vertex.trySetTerminationCause(vet.getTerminationCause());
vertex.abortVertex(VertexStatus.State.KILLED);
vertex.addDiagnostic("Vertex received Kill in INITED state.");
vertex.finished(VertexState.KILLED);
}
}
- private static class KillTasksTransition
+ private static class VertexKilledTransition
implements SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
vertex.addDiagnostic("Vertex received Kill while in RUNNING state.");
- for (Task task : vertex.tasks.values()) {
- vertex.eventHandler.handle(
- new TaskEvent(task.getTaskId(), TaskEventType.T_KILL));
+ VertexEventTermination vet = (VertexEventTermination) event;
+ VertexTerminationCause trigger = vet.getTerminationCause();
+ switch(trigger){
+ case DAG_KILL : vertex.enactKill(trigger, TaskTerminationCause.DAG_KILL); break;
+ case OTHER_VERTEX_FAILURE: vertex.enactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
+ case OWN_TASK_FAILURE: vertex.enactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break;
+ default://should not occur
+ throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger);
}
+
// TODO: Metrics
//job.metrics.endRunningJob(job);
}
@@ -1153,6 +1245,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ boolean forceTransitionToKillWait = false;
vertex.completedTaskCount++;
LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
@@ -1160,6 +1253,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(vertex, task);
} else if (taskEvent.getState() == TaskState.FAILED) {
+ vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
+ forceTransitionToKillWait = true;
taskFailed(vertex, task);
} else if (taskEvent.getState() == TaskState.KILLED) {
taskKilled(vertex, task);
@@ -1167,6 +1262,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.vertexScheduler.onVertexCompleted();
VertexState state = VertexImpl.checkVertexForCompletion(vertex);
+ if(state == VertexState.RUNNING && forceTransitionToKillWait){
+ return VertexState.TERMINATING;
+ }
+
if(state == VertexState.SUCCEEDED) {
vertex.vertexScheduler.onVertexCompleted();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 421d639..a2585c6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -44,9 +44,12 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
@@ -82,6 +85,7 @@ public class TestDAGImpl {
private AppContext appContext;
private ApplicationAttemptId appAttemptId;
private DAGImpl dag;
+ private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
private TaskAttemptListener taskAttemptListener;
@@ -120,6 +124,18 @@ public class TestDAGImpl {
}
}
+ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(TaskEvent event) {
+ DAGImpl handler = event.getTaskID().getVertexID().getDAGId().equals(dagId) ?
+ dag : mrrDag;
+ Vertex vertex = handler.getVertex(event.getTaskID().getVertexID());
+ Task task = vertex.getTask(event.getTaskID());
+ ((EventHandler<TaskEvent>)task).handle(event);
+ }
+ }
+
private class VertexEventDispatcher
implements EventHandler<VertexEvent> {
@@ -483,6 +499,8 @@ public class TestDAGImpl {
mrrAppContext);
doReturn(mrrDag).when(mrrAppContext).getDAG();
doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
+ taskEventDispatcher = new TaskEventDispatcher();
+ dispatcher.register(TaskEventType.class, taskEventDispatcher);
vertexEventDispatcher = new VertexEventDispatcher();
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
dagEventDispatcher = new DagEventDispatcher();
@@ -613,13 +631,13 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
dispatcher.await();
- Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+ Assert.assertEquals(DAGState.TERMINATING, dag.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
- Assert.assertEquals(VertexState.KILL_WAIT, v1.getState());
+ Assert.assertEquals(VertexState.TERMINATING, v1.getState());
for (int i = 2 ; i < 6; ++i ) {
TezVertexID vId = new TezVertexID(dagId, i);
Vertex v = dag.getVertex(vId);
- Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ Assert.assertEquals(VertexState.KILLED, v.getState());
}
Assert.assertEquals(1, dag.getSuccessfulVertices());
}
@@ -666,7 +684,6 @@ public class TestDAGImpl {
@SuppressWarnings("unchecked")
@Test
- @Ignore
public void testVertexFailureHandling() {
initDAG(dag);
startDAG(dag);
@@ -689,13 +706,19 @@ public class TestDAGImpl {
for (int i = 3; i < 6; ++i) {
TezVertexID vId = new TezVertexID(dagId, i);
Vertex v = dag.getVertex(vId);
- Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ Assert.assertEquals(VertexState.KILLED, v.getState());
}
}
+ // a dag.kill() on an active DAG races with vertices all succeeding.
+ // if a JOB_KILL is processed while dag is in running state, it should end in KILLED,
+ // regardless of whether all vertices complete
+ //
+ // Final state:
+ // DAG is in KILLED state, with killTrigger = USER_KILL
+ // Each vertex had kill triggered but raced ahead and ends in SUCCEEDED state.
@SuppressWarnings("unchecked")
@Test
- @Ignore
public void testDAGKill() {
initDAG(dag);
startDAG(dag);
@@ -708,19 +731,23 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
- dispatcher.getEventHandler().handle(
- new DAGEvent(dagId, DAGEventType.DAG_KILL));
-
+ dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
for (int i = 2; i < 6; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
new TezVertexID(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
Assert.assertEquals(6, dag.getSuccessfulVertices());
+ for (Vertex v : dag.getVertices().values()) {
+ Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
+ }
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
+ // job kill races with most vertices succeeding and one directly killed.
+ // because the job.kill() happens before the direct kill, the vertex has kill_trigger=DAG_KILL
@SuppressWarnings("unchecked")
@Test
public void testDAGKillPending() {
@@ -735,21 +762,21 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
- dispatcher.getEventHandler().handle(
- new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
for (int i = 2; i < 5; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
new TezVertexID(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
- Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
+ Assert.assertEquals(DAGState.KILLED, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
new TezVertexID(dagId, 5), VertexState.KILLED));
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
+ Assert.assertEquals(dag.getVertex(new TezVertexID(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 55d4354..6ed9e83 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -47,9 +47,11 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -128,7 +130,7 @@ public class TestTaskImpl {
}
private void killTask(TezTaskID taskId) {
- mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+ mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
assertTaskKillWaitState();
}
@@ -193,7 +195,7 @@ public class TestTaskImpl {
}
/**
- * {@link TaskState#KILL_WAIT}
+ * {@link TaskState#TERMINATING}
*/
private void assertTaskKillWaitState() {
assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 10e92d5..15c8a9b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -59,7 +59,9 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
@@ -70,6 +72,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.avro.HistoryEventType;
import org.apache.tez.dag.records.TezDAGID;
@@ -105,6 +108,7 @@ public class TestVertexImpl {
private TezConfiguration conf = new TezConfiguration();
private Map<String, EdgeProperty> edges;
+ private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
@@ -156,12 +160,16 @@ public class TestVertexImpl {
}
}
- private class TaskEventHandler implements EventHandler<TaskEvent> {
+ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskEvent event) {
+ VertexImpl vertex = vertexIdMap.get(event.getTaskID().getVertexID());
+ Task task = vertex.getTask(event.getTaskID());
+ ((EventHandler<TaskEvent>)task).handle(event);
}
}
-
+
private class DagEventDispatcher implements EventHandler<DAGEvent> {
public Map<DAGEventType, Integer> eventCount =
new HashMap<DAGEventType, Integer>();
@@ -221,6 +229,7 @@ public class TestVertexImpl {
.build();
return dag;
}
+
private DAGPlan createTestDAGPlan() {
LOG.info("Setting up dag plan");
@@ -511,13 +520,13 @@ public class TestVertexImpl {
edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(
dagPlan.getEdgeList());
parseVertexEdges();
+ taskEventDispatcher = new TaskEventDispatcher();
+ dispatcher.register(TaskEventType.class, taskEventDispatcher);
vertexEventDispatcher = new VertexEventDispatcher();
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
dagEventDispatcher = new DagEventDispatcher();
dispatcher.register(DAGEventType.class, dagEventDispatcher);
- dispatcher.register(HistoryEventType.class,
- new HistoryHandler());
- dispatcher.register(TaskEventType.class, new TaskEventHandler());
+ dispatcher.register(HistoryEventType.class, new HistoryHandler());
dispatcher.init(conf);
dispatcher.start();
@@ -558,15 +567,12 @@ public class TestVertexImpl {
}
@SuppressWarnings("unchecked")
- private void killVertex(VertexImpl v, boolean checkKillWait) {
+ private void killVertex(VertexImpl v) {
dispatcher.getEventHandler().handle(
- new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+ new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
dispatcher.await();
- if (checkKillWait) {
- Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
- } else {
- Assert.assertEquals(VertexState.KILLED, v.getState());
- }
+ Assert.assertEquals(VertexState.KILLED, v.getState());
+ Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_KILL);
}
@SuppressWarnings("unchecked")
@@ -707,6 +713,7 @@ public class TestVertexImpl {
new VertexEventTaskCompleted(t1, TaskState.FAILED));
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
String diagnostics =
StringUtils.join(",", v.getDiagnostics()).toLowerCase();
Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
@@ -715,7 +722,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexKillDiagnostics() {
VertexImpl v1 = vertices.get("vertex1");
- killVertex(v1, false);
+ killVertex(v1);
String diagnostics =
StringUtils.join(",", v1.getDiagnostics()).toLowerCase();
Assert.assertTrue(diagnostics.contains(
@@ -723,7 +730,7 @@ public class TestVertexImpl {
VertexImpl v2 = vertices.get("vertex2");
initVertex(v2);
- killVertex(v2, false);
+ killVertex(v2);
diagnostics =
StringUtils.join(",", v2.getDiagnostics()).toLowerCase();
LOG.info("diagnostics v2: " + diagnostics);
@@ -740,7 +747,7 @@ public class TestVertexImpl {
initVertex(v6);
startVertex(v3);
- killVertex(v3, true);
+ killVertex(v3);
diagnostics =
StringUtils.join(",", v3.getDiagnostics()).toLowerCase();
Assert.assertTrue(diagnostics.contains(
@@ -756,15 +763,15 @@ public class TestVertexImpl {
startVertex(v);
dispatcher.getEventHandler().handle(
- new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+ new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
dispatcher.await();
- Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ Assert.assertEquals(VertexState.KILLED, v.getState());
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
- Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ Assert.assertEquals(VertexState.KILLED, v.getState());
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
@@ -774,8 +781,7 @@ public class TestVertexImpl {
}
@SuppressWarnings("unchecked")
- @Test(timeout = 5000)
- @Ignore
+ @Test
public void testVertexKill() {
initAllVertices();
@@ -783,13 +789,15 @@ public class TestVertexImpl {
startVertex(v);
dispatcher.getEventHandler().handle(
- new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
- Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.KILLED, v.getState());
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
- Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+ dispatcher.await();
+ Assert.assertEquals(VertexState.KILLED, v.getState());
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
@@ -800,7 +808,6 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- @Ignore
public void testKilledTasksHandling() {
initAllVertices();
@@ -814,6 +821,7 @@ public class TestVertexImpl {
new VertexEventTaskCompleted(t1, TaskState.FAILED));
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
Assert.assertEquals(TaskState.KILLED, v.getTask(t2).getState());
}
@@ -869,6 +877,7 @@ public class TestVertexImpl {
new VertexEventTaskCompleted(t2, TaskState.FAILED));
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.abortCounter);
}
@@ -970,6 +979,7 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.RUNNING, v6.getState());
Assert.assertEquals(4, v6.successSourceAttemptCompletionEventNoMap.size());
Assert.assertEquals(6, v6.getTaskAttemptCompletionEvents(0, 100).length);
+
}
@SuppressWarnings("unchecked")
@@ -1099,6 +1109,7 @@ public class TestVertexImpl {
new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v.getTerminationCause());
Assert.assertEquals(1, committer.commitCounter);
// FIXME need to verify whether abort needs to be called if commit fails
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/999d16ae/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 5b79d3d..51341a2 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -60,6 +60,7 @@ import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
import org.apache.tez.dag.api.EdgeProperty.SourceType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
import org.apache.tez.engine.lib.input.ShuffledMergedInput;
import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.examples.MRRSleepJob;
@@ -173,13 +174,34 @@ public class TestMRRJobsDAGApi {
// client.
@Test(timeout = 60000)
public void testMRRSleepJobDagSubmit() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(false);
+
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ // Submits a simple 3 stage sleep job using the DAG submit API instead of job
+ // client.
+ @Test(timeout = 60000)
+ public void testMRRSleepJobDagSubmitAndKill() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(true);
+
+ Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ public State testMRRSleepJobDagSubmitCore(boolean killDagWhileRunning) throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
+ " not found. Not running test.");
- return;
+ return State.ERROR;
}
JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
@@ -368,6 +390,7 @@ public class TestMRRJobsDAGApi {
null, "default", Collections.singletonList(""), commonEnv,
amLocalResources, new TezConfiguration());
+
DAGStatus dagStatus = dagClient.getDAGStatus();
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for job to complete. Sleeping for 500ms. Current state: "
@@ -375,12 +398,13 @@ public class TestMRRJobsDAGApi {
// TODO The test will fail if the AM sleep is removed. TEZ-207 to fix
// this.
Thread.sleep(500l);
+ if(killDagWhileRunning && dagStatus.getState() == DAGStatus.State.RUNNING){
+ dagClient.tryKillDAG();
+ dagStatus = dagClient.getDAGStatus();
+ }
dagStatus = dagClient.getDAGStatus();
}
-
- Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
- // TODO Add additional checks for tracking URL etc. - once it's exposed by
- // the DAG API.
+ return dagStatus.getState();
}
private static LocalResource createLocalResource(FileSystem fc, Path file,