You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/28 06:24:48 UTC

tez git commit: TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes. (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 02aafb55e -> 17b6aba9c


TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes. (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/17b6aba9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/17b6aba9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/17b6aba9

Branch: refs/heads/master
Commit: 17b6aba9c7f85d710eb97a006b9a42fbe0510cbc
Parents: 02aafb5
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 28 12:24:39 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 28 12:24:39 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/tez/dag/app/AppContext.java |  2 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  5 +++++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 16 ++++++++++++++++
 .../apache/tez/dag/app/dag/impl/TestCommit.java |  4 +++-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 20 ++++++++++++++++++++
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |  4 ++++
 .../dag/app/dag/impl/TestVertexRecovery.java    |  5 +++++
 8 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9318d2..ef9a839 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -406,6 +406,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes.
   TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
   TEZ-2369. Add a few unit tests for RootInputInitializerManager.
   TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:

http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 4781784..c005447 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -91,6 +91,8 @@ public interface AppContext {
 
   boolean isSession();
 
+  boolean isLocal();
+
   DAGAppMasterState getAMState();
 
   HistoryEventHandler getHistoryHandler();

http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 195bc6c..0fadcfa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1423,6 +1423,11 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public boolean isLocal() {
+      return isLocal;
+    }
+
+    @Override
     public DAGAppMasterState getAMState() {
       return state;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0a87241..38da302 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1413,6 +1413,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       VertexImpl v = createVertex(this, vertexName, i);
       addVertex(v);
     }
+    // check task resources, only check it in non-local mode
+    if (!appContext.isLocal()) {
+      for (Vertex v : vertexMap.values()) {
+        if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) {
+          String msg = "Vertex's TaskResource is beyond the cluster container capability," +
+              "Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource()
+              + ", Cluster MaxContainerCapability=" + appContext.getClusterInfo().getMaxContainerCapability();
+          LOG.error(msg);
+          addDiagnostic(msg);
+          finished(DAGState.FAILED);
+          return DAGState.FAILED;
+        }
+      }
+    }
 
     createDAGEdges(this);
     Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
@@ -1453,6 +1467,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         }
       }
     }
+
     return DAGState.INITED;
   }
 
@@ -1678,6 +1693,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
       DAGState state = dag.initializeDAG();
       if (state != DAGState.INITED) {
+        dag.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
         return state;
       }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 3d3bca4..1ce9570 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -319,7 +320,8 @@ public class TestCommit {
         taskAttemptListener, fsTokens, clock, "user", thh, appContext);
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
-
+    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+    doReturn(clusterInfo).when(appContext).getClusterInfo();
     dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);

http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 4787247..e268a99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGScheduler;
@@ -182,6 +183,7 @@ public class TestDAGImpl {
   private AppContext dagWithCustomEdgeAppContext;
   private HistoryEventHandler historyEventHandler;
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
+  private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
 
   private DAGImpl chooseDAG(TezDAGID curDAGId) {
     if (curDAGId.equals(dagId)) {
@@ -786,6 +788,7 @@ public class TestDAGImpl {
         fsTokens, clock, "user", thh, appContext);
     dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
     doReturn(dag).when(appContext).getCurrentDAG();
+    doReturn(clusterInfo).when(appContext).getClusterInfo();
     mrrAppContext = mock(AppContext.class);
     doReturn(aclManager).when(mrrAppContext).getAMACLManager();
     doReturn(execService).when(mrrAppContext).getExecService();
@@ -801,6 +804,7 @@ public class TestDAGImpl {
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
     doReturn(appAttemptId.getApplicationId()).when(mrrAppContext).getApplicationID();
     doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler();
+    doReturn(clusterInfo).when(mrrAppContext).getClusterInfo();
     groupAppContext = mock(AppContext.class);
     doReturn(aclManager).when(groupAppContext).getAMACLManager();
     doReturn(execService).when(groupAppContext).getExecService();
@@ -817,6 +821,7 @@ public class TestDAGImpl {
     doReturn(appAttemptId.getApplicationId())
         .when(groupAppContext).getApplicationID();
     doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
+    doReturn(clusterInfo).when(groupAppContext).getClusterInfo();
 
     // reset totalCommitCounter to 0
     TotalCountingOutputCommitter.totalCommitCounter = 0;
@@ -885,6 +890,7 @@ public class TestDAGImpl {
     doReturn(appAttemptId).when(dagWithCustomEdgeAppContext).getApplicationAttemptId();
     doReturn(appAttemptId.getApplicationId()).when(dagWithCustomEdgeAppContext).getApplicationID();
     doReturn(historyEventHandler).when(dagWithCustomEdgeAppContext).getHistoryHandler();
+    doReturn(clusterInfo).when(dagWithCustomEdgeAppContext).getClusterInfo();
     dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDisptacher2());
     dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventHandler());
   }
@@ -922,6 +928,20 @@ public class TestDAGImpl {
   }
 
   @Test(timeout = 5000)
+  public void testDAGInitFailedDuetoInvalidResource() {
+    // cluster maxContainerCapability is less than the vertex resource request
+    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(512,10));
+    doReturn(clusterInfo).when(appContext).getClusterInfo();
+    dag.handle(
+        new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
+    Assert.assertTrue(StringUtils.join(dag.getDiagnostics(),",")
+        .contains("Vertex's TaskResource is beyond the cluster container capability"));
+  }
+
+  @Test(timeout = 5000)
   public void testDAGStart() {
     initDAG(dag);
     startDAG(dag);

http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index bd4653b..a0d5fb5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -28,12 +28,14 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -78,6 +80,8 @@ public class TestDAGRecovery {
   public void setUp() {
     mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
     when(mockAppContext.getCurrentDAG().getDagUGI()).thenReturn(null);
+    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+    doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
     mockEventHandler = mock(EventHandler.class);
     tezCounters.findCounter("grp_1", "counter_1").increment(1);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/17b6aba9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index ad57ba8..2d03c60 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -51,6 +53,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.VertexState;
@@ -418,6 +421,8 @@ public class TestVertexRecovery {
             new Credentials(), new SystemClock(), user,
             mock(TaskHeartbeatHandler.class), mockAppContext);
     when(mockAppContext.getCurrentDAG()).thenReturn(dag);
+    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+    doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
 
     dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
     LOG.info("finish setUp");