You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/28 06:24:48 UTC
tez git commit: TEZ-2488. Tez AM crashes if a submitted DAG is
configured to use invalid resource sizes. (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 02aafb55e -> 17b6aba9c
TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes. (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/17b6aba9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/17b6aba9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/17b6aba9
Branch: refs/heads/master
Commit: 17b6aba9c7f85d710eb97a006b9a42fbe0510cbc
Parents: 02aafb5
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 28 12:24:39 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 28 12:24:39 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/dag/app/AppContext.java | 2 ++
.../org/apache/tez/dag/app/DAGAppMaster.java | 5 +++++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 16 ++++++++++++++++
.../apache/tez/dag/app/dag/impl/TestCommit.java | 4 +++-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 20 ++++++++++++++++++++
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 4 ++++
.../dag/app/dag/impl/TestVertexRecovery.java | 5 +++++
8 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9318d2..ef9a839 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -406,6 +406,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes.
TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
TEZ-2369. Add a few unit tests for RootInputInitializerManager.
TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 4781784..c005447 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -91,6 +91,8 @@ public interface AppContext {
boolean isSession();
+ boolean isLocal();
+
DAGAppMasterState getAMState();
HistoryEventHandler getHistoryHandler();
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 195bc6c..0fadcfa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1423,6 +1423,11 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
public DAGAppMasterState getAMState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0a87241..38da302 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1413,6 +1413,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
VertexImpl v = createVertex(this, vertexName, i);
addVertex(v);
}
+ // check task resources, only check it in non-local mode
+ if (!appContext.isLocal()) {
+ for (Vertex v : vertexMap.values()) {
+ if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) {
+ String msg = "Vertex's TaskResource is beyond the cluster container capability," +
+ "Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource()
+ + ", Cluster MaxContainerCapability=" + appContext.getClusterInfo().getMaxContainerCapability();
+ LOG.error(msg);
+ addDiagnostic(msg);
+ finished(DAGState.FAILED);
+ return DAGState.FAILED;
+ }
+ }
+ }
createDAGEdges(this);
Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
@@ -1453,6 +1467,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
}
+
return DAGState.INITED;
}
@@ -1678,6 +1693,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGState state = dag.initializeDAG();
if (state != DAGState.INITED) {
+ dag.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
return state;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 3d3bca4..1ce9570 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
@@ -319,7 +320,8 @@ public class TestCommit {
taskAttemptListener, fsTokens, clock, "user", thh, appContext);
doReturn(dag).when(appContext).getCurrentDAG();
doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
-
+ ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+ doReturn(clusterInfo).when(appContext).getClusterInfo();
dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
taskEventDispatcher = new TaskEventDispatcher();
dispatcher.register(TaskEventType.class, taskEventDispatcher);
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 4787247..e268a99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGScheduler;
@@ -182,6 +183,7 @@ public class TestDAGImpl {
private AppContext dagWithCustomEdgeAppContext;
private HistoryEventHandler historyEventHandler;
private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
+ private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
private DAGImpl chooseDAG(TezDAGID curDAGId) {
if (curDAGId.equals(dagId)) {
@@ -786,6 +788,7 @@ public class TestDAGImpl {
fsTokens, clock, "user", thh, appContext);
dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
doReturn(dag).when(appContext).getCurrentDAG();
+ doReturn(clusterInfo).when(appContext).getClusterInfo();
mrrAppContext = mock(AppContext.class);
doReturn(aclManager).when(mrrAppContext).getAMACLManager();
doReturn(execService).when(mrrAppContext).getExecService();
@@ -801,6 +804,7 @@ public class TestDAGImpl {
doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(mrrAppContext).getApplicationID();
doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler();
+ doReturn(clusterInfo).when(mrrAppContext).getClusterInfo();
groupAppContext = mock(AppContext.class);
doReturn(aclManager).when(groupAppContext).getAMACLManager();
doReturn(execService).when(groupAppContext).getExecService();
@@ -817,6 +821,7 @@ public class TestDAGImpl {
doReturn(appAttemptId.getApplicationId())
.when(groupAppContext).getApplicationID();
doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
+ doReturn(clusterInfo).when(groupAppContext).getClusterInfo();
// reset totalCommitCounter to 0
TotalCountingOutputCommitter.totalCommitCounter = 0;
@@ -885,6 +890,7 @@ public class TestDAGImpl {
doReturn(appAttemptId).when(dagWithCustomEdgeAppContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(dagWithCustomEdgeAppContext).getApplicationID();
doReturn(historyEventHandler).when(dagWithCustomEdgeAppContext).getHistoryHandler();
+ doReturn(clusterInfo).when(dagWithCustomEdgeAppContext).getClusterInfo();
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDisptacher2());
dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventHandler());
}
@@ -922,6 +928,20 @@ public class TestDAGImpl {
}
@Test(timeout = 5000)
+ public void testDAGInitFailedDuetoInvalidResource() {
+ // cluster maxContainerCapability is less than the vertex resource request
+ ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(512,10));
+ doReturn(clusterInfo).when(appContext).getClusterInfo();
+ dag.handle(
+ new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.FAILED, dag.getState());
+ Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
+ Assert.assertTrue(StringUtils.join(dag.getDiagnostics(),",")
+ .contains("Vertex's TaskResource is beyond the cluster container capability"));
+ }
+
+ @Test(timeout = 5000)
public void testDAGStart() {
initDAG(dag);
startDAG(dag);
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index bd4653b..a0d5fb5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -28,12 +28,14 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
@@ -78,6 +80,8 @@ public class TestDAGRecovery {
public void setUp() {
mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
when(mockAppContext.getCurrentDAG().getDagUGI()).thenReturn(null);
+ ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+ doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
mockEventHandler = mock(EventHandler.class);
tezCounters.findCounter("grp_1", "counter_1").increment(1);
http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index ad57ba8..2d03c60 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.*;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -51,6 +53,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.VertexState;
@@ -418,6 +421,8 @@ public class TestVertexRecovery {
new Credentials(), new SystemClock(), user,
mock(TaskHeartbeatHandler.class), mockAppContext);
when(mockAppContext.getCurrentDAG()).thenReturn(dag);
+ ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+ doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
LOG.info("finish setUp");