You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/12/20 19:59:07 UTC

git commit: TEZ-683. Diamond shape DAG fail. (hitesh)

Updated Branches:
  refs/heads/master 27fd81e4f -> 97e9a5ef3


TEZ-683. Diamond shape DAG fail. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/97e9a5ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/97e9a5ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/97e9a5ef

Branch: refs/heads/master
Commit: 97e9a5ef36f94b6620ee91d961b950745470d5b4
Parents: 27fd81e
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Dec 20 10:58:26 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Dec 20 10:58:26 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  10 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 352 +++++++++++++++++--
 2 files changed, 338 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97e9a5ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5f89df7..6344652 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -211,6 +211,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.INITIALIZING, VertexState.FAILED),
               VertexEventType.V_INIT,
               new InitTransition())
+          .addTransition(VertexState.NEW, VertexState.NEW,
+              VertexEventType.V_SOURCE_VERTEX_STARTED,
+              new SourceVertexStartedTransition())
           .addTransition(VertexState.NEW, VertexState.KILLED,
               VertexEventType.V_TERMINATE,
               new TerminateNewVertexTransition())
@@ -322,6 +325,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
@@ -358,6 +362,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.FAILED, VertexState.FAILED,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_START,
                   VertexEventType.V_ROUTE_EVENT,
@@ -956,8 +961,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
 
       if (oldState != getInternalState()) {
-        LOG.info(vertexId + " transitioned from " + oldState + " to "
-                 + getInternalState());
+        LOG.info(logIdentifier + " transitioned from " + oldState + " to "
+            + getInternalState() + " due to event "
+            + event.getType());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97e9a5ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 78c7dcc..61c9745 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -76,6 +78,8 @@ import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -104,6 +108,8 @@ import org.junit.Test;
 import org.mockito.internal.util.collections.Sets;
 
 import com.google.common.collect.Lists;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestVertexImpl {
 
@@ -123,13 +129,13 @@ public class TestVertexImpl {
   private Clock clock = new SystemClock();
   private TaskHeartbeatHandler thh;
   private AppContext appContext;
-  private VertexLocationHint vertexLocationHint;
+  private VertexLocationHint vertexLocationHint = null;
   private Configuration conf;
   private Map<String, Edge> edges;
 
+  private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
   private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
-
   private DagEventDispatcher dagEventDispatcher;
 
   private class CountingVertexOutputCommitter extends
@@ -190,6 +196,18 @@ public class TestVertexImpl {
     }
   }
 
+  private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(TaskAttemptEvent event) {
+      VertexImpl vertex = vertexIdMap.get(
+        event.getTaskAttemptID().getTaskID().getVertexID());
+      Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
+      ((EventHandler<TaskAttemptEvent>)task.getAttempt(
+          event.getTaskAttemptID())).handle(event);
+    }
+  }
+
   private class TaskEventDispatcher implements EventHandler<TaskEvent> {
     @SuppressWarnings("unchecked")
     @Override
@@ -295,25 +313,26 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .addInputs(
                     RootInputLeafOutputProto.newBuilder()
-                    .setInitializerClassName(initializerClassName)
-                    .setName("input2")
-                    .setEntityDescriptor(
-                        TezEntityDescriptorProto.newBuilder()
-                            .setClassName("InputClazz")
-                            .build()
-                    ).build()
-                )
+                        .setInitializerClassName(initializerClassName)
+                        .setName("input2")
+                        .setEntityDescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                              .setClassName("InputClazz")
+                              .build()
+                        )
+                        .build()
+                    )
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(-1)
-                    .setVirtualCores(4)
-                    .setMemoryMb(1024)
-                    .setJavaOpts("")
-                    .setTaskModule("x2.y2")
-                    .build()
+                        .setNumTasks(-1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
                 )
                 .addInEdgeId("e1")
-            .build()
+              .build()
         )
         .addEdge(
             EdgePlan.newBuilder()
@@ -327,7 +346,7 @@ public class TestVertexImpl {
                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
                 .build()
             )
-    .build();
+        .build();
     return dag;
   }
   
@@ -692,6 +711,230 @@ public class TestVertexImpl {
     return dag;
   }
 
+  // Create a plan with 3 vertices: A, B, C
+  // A -> B, A -> C, B -> C
+  private DAGPlan createSamplerDAGPlan() {
+    LOG.info("Setting up dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("TestSamplerDAG")
+        .addVertex(
+          VertexPlan.newBuilder()
+            .setName("A")
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+              PlanTaskLocationHint.newBuilder()
+                .addHost("host1")
+                .addRack("rack1")
+                .build()
+            )
+            .setTaskConfig(
+              PlanTaskConfiguration.newBuilder()
+                .setNumTasks(1)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("A.class")
+                .build()
+            )
+            .addOutEdgeId("A_B")
+            .addOutEdgeId("A_C")
+            .build()
+        )
+        .addVertex(
+          VertexPlan.newBuilder()
+            .setName("B")
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class"))
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+              PlanTaskLocationHint.newBuilder()
+                .addHost("host2")
+                .addRack("rack2")
+                .build()
+            )
+            .setTaskConfig(
+              PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("")
+                .build()
+            )
+            .addInEdgeId("A_B")
+            .addOutEdgeId("B_C")
+            .build()
+        )
+        .addVertex(
+          VertexPlan.newBuilder()
+            .setName("C")
+            .setType(PlanVertexType.NORMAL)
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+            .addTaskLocationHint(
+              PlanTaskLocationHint.newBuilder()
+                .addHost("host3")
+                .addRack("rack3")
+                .build()
+            )
+            .setTaskConfig(
+              PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("foo")
+                .setTaskModule("x3.y3")
+                .build()
+            )
+            .addInEdgeId("A_C")
+            .addInEdgeId("B_C")
+            .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class"))
+                .setInputVertexName("A")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class"))
+                .setOutputVertexName("B")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("A_B")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_C"))
+                .setInputVertexName("A")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_C.class"))
+                .setOutputVertexName("C")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("A_C")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+                .setInputVertexName("B")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+                .setOutputVertexName("C")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("B_C")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+          )
+        .build();
+
+    return dag;
+  }
+
+  // Create a plan with 3 vertices: A, B, C
+  // A -> C, B -> C
+  private DAGPlan createSamplerDAGPlan2() {
+    LOG.info("Setting up dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("TestSamplerDAG")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("A")
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder()
+                        .addHost("host1")
+                        .addRack("rack1")
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("A.class")
+                        .build()
+                )
+                .addOutEdgeId("A_C")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("B")
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class"))
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder()
+                        .addHost("host2")
+                        .addRack("rack2")
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(2)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("")
+                        .build()
+                )
+                .addOutEdgeId("B_C")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("C")
+                .setType(PlanVertexType.NORMAL)
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder()
+                        .addHost("host3")
+                        .addRack("rack3")
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(2)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("foo")
+                        .setTaskModule("x3.y3")
+                        .build()
+                )
+                .addInEdgeId("A_C")
+                .addInEdgeId("B_C")
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_C"))
+                .setInputVertexName("A")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_C.class"))
+                .setOutputVertexName("C")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("A_C")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+                .setInputVertexName("B")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+                .setOutputVertexName("C")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("B_C")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+
+    return dag;
+  }
+
   private void setupVertices() {
     int vCnt = dagPlan.getVertexCount();
     LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
@@ -701,15 +944,17 @@ public class TestVertexImpl {
       VertexPlan vPlan = dagPlan.getVertex(i);
       String vName = vPlan.getName();
       TezVertexID vertexId = TezVertexID.getInstance(dagId, i+1);
-      VertexImpl v;
+      VertexImpl v = null;
+      VertexLocationHint locationHint = DagTypeConverters.convertFromDAGPlan(
+          vPlan.getTaskLocationHintList());
       if (useCustomInitializer) {
         v = new VertexImplWithCustomInitializer(vertexId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
-          clock, thh, appContext, vertexLocationHint, dispatcher);
+          clock, thh, appContext, locationHint, dispatcher);
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
-            clock, thh, appContext, vertexLocationHint);
+            clock, thh, appContext, locationHint);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -780,8 +1025,20 @@ public class TestVertexImpl {
     doReturn(dagId).when(dag).getID();
     doReturn(taskScheduler).when(appContext).getTaskScheduler();
     doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
+
     setupVertices();
-    
+    when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
+      @Override
+      public Vertex answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args.length != 1) {
+          return null;
+        }
+        TezVertexID vId = (TezVertexID) args[0];
+        return vertexIdMap.get(vId);
+      }
+    });
+
     // TODO - this test logic is tightly linked to impl DAGImpl code.
     edges = new HashMap<String, Edge>();
     for (EdgePlan edgePlan : dagPlan.getEdgeList()) {
@@ -791,6 +1048,8 @@ public class TestVertexImpl {
     }
 
     parseVertexEdges();
+    taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
     vertexEventDispatcher = new VertexEventDispatcher();
@@ -1746,4 +2005,53 @@ public class TestVertexImpl {
     }
   }
 
+  @Test(timeout=5000)
+  public void testInitStartRace() {
+    // Race when a source vertex manages to start before the target vertex has
+    // been initialized
+    setupPreDagCreation();
+    dagPlan = createSamplerDAGPlan();
+    setupPostDagCreation();
+
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_START));
+
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.RUNNING, vB.getState());
+    Assert.assertEquals(VertexState.RUNNING, vC.getState());
+  }
+
+  @Test(timeout=5000)
+  public void testInitStartRace2() {
+    // Race when a source vertex manages to start before the target vertex has
+    // been initialized
+    setupPreDagCreation();
+    dagPlan = createSamplerDAGPlan2();
+    setupPostDagCreation();
+
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+        VertexEventType.V_START));
+    dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(),
+        VertexEventType.V_START));
+
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.RUNNING, vB.getState());
+    Assert.assertEquals(VertexState.RUNNING, vC.getState());
+  }
 }