You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2014/09/19 08:15:40 UTC

git commit: TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks (Rajesh Balamohan) (Cherry picked from commit 82ec16baf890a6914337544ff3f6a7715aa1b86e)

Repository: tez
Updated Branches:
  refs/heads/branch-0.5 2d04d64aa -> 21ef8cd20


TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks (Rajesh Balamohan)
(Cherry picked from commit 82ec16baf890a6914337544ff3f6a7715aa1b86e)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/21ef8cd2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/21ef8cd2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/21ef8cd2

Branch: refs/heads/branch-0.5
Commit: 21ef8cd20e327927684e756b041a045df72d1c8f
Parents: 2d04d64
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Sep 19 11:42:45 2014 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Sep 19 11:44:46 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/impl/ImmediateStartVertexManager.java   |  9 +++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 45 ++++++++++++++++++++
 3 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/21ef8cd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e90634d..5c89d31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,7 @@ ALL CHANGES
   TEZ-1585. Memory leak in tez session mode.
   TEZ-1533. Request Events more often if a complete set of events is received by a task.
   TEZ-1587. Some tez-examples fail in local mode.
+  TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks.
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/21ef8cd2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index ac2b851..773426b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -64,7 +64,14 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
     for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
       String srcVertex = entry.getKey();
       EdgeProperty edgeProp = entry.getValue();
-      srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+      LOG.info("Task count in " + srcVertex + ": " + getContext().getVertexNumTasks(srcVertex));
+      //track vertices with task count > 0
+      if (getContext().getVertexNumTasks(srcVertex) > 0) {
+        srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+      } else {
+        LOG.info("Vertex: " + getContext().getVertexName() + "; Ignoring " + srcVertex
+            + " as it has got 0 tasks");
+      }
     }
 
     //handle completions

http://git-wip-us.apache.org/repos/asf/tez/blob/21ef8cd2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 699814e..bd60ccb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -3642,6 +3642,8 @@ public class TestVertexImpl {
      *   M7 --(B)---------------->M5 ---(SG)--> R6
      *                            /
      *   M8---(C)--------------->/
+     *                          /
+     *   M9---(B)--------------> (zero task vertex)
      */
 
     //init M2
@@ -3650,15 +3652,23 @@ public class TestVertexImpl {
     VertexImpl r3 = vertices.get("R3");
     VertexImpl m5 = vertices.get("M5");
     VertexImpl m8 = vertices.get("M8");
+    VertexImpl m9 = vertices.get("M9");
 
     initVertex(m2);
     initVertex(m7);
     initVertex(m8);
+    initVertex(m9);
     assertTrue(m7.getState().equals(VertexState.INITED));
+    assertTrue(m9.getState().equals(VertexState.INITED));
     assertTrue(m5.getState().equals(VertexState.INITED));
     assertTrue(m8.getState().equals(VertexState.INITED));
     assertTrue(m7.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager);
 
+    //Start M9
+    dispatcher.getEventHandler().handle(new VertexEvent(m9.getVertexId(),
+        VertexEventType.V_START));
+    dispatcher.await();
+
     //Start M2; Let tasks complete in M2; Also let 1 task complete in R3
     dispatcher.getEventHandler().handle(new VertexEvent(m2.getVertexId(), VertexEventType.V_START));
     dispatcher.await();
@@ -3708,6 +3718,8 @@ public class TestVertexImpl {
     dispatcher.getEventHandler().handle(new VertexEvent(m8.getVertexId(),VertexEventType.V_START));
     dispatcher.await();
 
+    assertTrue(m9.getState().equals(VertexState.SUCCEEDED));
+
     //M5 in running state. But tasks should not be scheduled until M8 finishes a task.
     assertTrue(m5.getState().equals(VertexState.RUNNING));
     for(Task task : m5.getTasks().values()) {
@@ -3740,6 +3752,8 @@ public class TestVertexImpl {
      *   M7 --(B)---------------->M5 ---(SG)--> R6
      *                            /
      *   M8---(C)--------------->/
+     *                          /
+     *   M9---(B)--------------> (zero task vertex)
      */
     DAGPlan dag = DAGPlan.newBuilder().setName("TestSamplerDAG")
         .addVertex(VertexPlan.newBuilder()
@@ -3778,6 +3792,24 @@ public class TestVertexImpl {
                 .addOutEdgeId("M8_M5")
                 .build()
         )
+        .addVertex(VertexPlan.newBuilder()
+                .setName("M9")
+                .setProcessorDescriptor(
+                    TezEntityDescriptorProto.newBuilder().setClassName("M9.class"))
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build())
+                .setTaskConfig(PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(0) //Zero task vertex
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("M9.class")
+                        .build()
+                )
+                .addOutEdgeId("M9_M5")
+                .build()
+        )
          .addVertex(VertexPlan.newBuilder()
                  .setName("R3")
                  .setProcessorDescriptor(
@@ -3815,6 +3847,7 @@ public class TestVertexImpl {
                 .addInEdgeId("R3_M5")
                 .addInEdgeId("M7_M5")
                 .addInEdgeId("M8_M5")
+                .addInEdgeId("M9_M5")
                 .addOutEdgeId("M5_R6")
                 .build()
         )
@@ -3904,6 +3937,18 @@ public class TestVertexImpl {
         )
         .addEdge(
             EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M9_M5"))
+                .setInputVertexName("M9")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M9_M5.class"))
+                .setOutputVertexName("M5")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("M9_M5")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
                 .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5"))
                 .setInputVertexName("M8")
                 .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5.class"))