You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/29 00:01:47 UTC

git commit: TEZ-1143. 1-1 source split event should be handled in Vertex.RUNNING state (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master b3a9ec0e7 -> 80b91a4d8


TEZ-1143. 1-1 source split event should be handled in Vertex.RUNNING state (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/80b91a4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/80b91a4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/80b91a4d

Branch: refs/heads/master
Commit: 80b91a4d8ccb211b50901cfc43dc3cfad991ec3f
Parents: b3a9ec0
Author: Bikas Saha <bi...@apache.org>
Authored: Wed May 28 15:00:59 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed May 28 15:01:36 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  36 +++---
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 115 +++++++++++++------
 .../tez/test/VertexManagerPluginForTest.java    |  46 ++++++++
 3 files changed, 151 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/80b91a4d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 17257ea..1cf63cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -362,6 +362,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.ERROR),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
+          .addTransition(VertexState.RUNNING, 
+              EnumSet.of(VertexState.RUNNING),
+              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
+              new OneToOneSourceSplitTransition())
           .addTransition(VertexState.RUNNING, VertexState.TERMINATING,
               VertexEventType.V_TERMINATE,
               new VertexKilledTransition())
@@ -539,7 +543,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private RootInputInitializerRunner rootInputInitializer;
 
-  private VertexManager vertexManager;
+  VertexManager vertexManager;
   
   private final UserGroupInformation dagUgi;
 
@@ -1155,8 +1159,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           edge.startEventBuffering();
         }
   
-        LOG.info("Vertex " + getVertexId() + 
-            " parallelism set to " + parallelism + " from " + numTasks);
         // assign to local variable of LinkedHashMap to make sure that changing
         // type of task causes compile error. We depend on LinkedHashMap for order
         LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
@@ -1179,6 +1181,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           LOG.info("Removing task: " + entry.getKey());
           iter.remove();
         }
+        LOG.info("Vertex " + logIdentifier + 
+            " parallelism set to " + parallelism + " from " + numTasks);
         this.numTasks = parallelism;
         assert tasks.size() == numTasks;
   
@@ -2587,8 +2591,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       VertexEventOneToOneSourceSplit splitEvent = 
           (VertexEventOneToOneSourceSplit)event;
       TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
+      
       if (vertex.originalOneToOneSplitSource != null) {
-        Preconditions.checkState(vertex.getState() == VertexState.INITED, 
+        VertexState state = vertex.getState();
+        Preconditions.checkState((state == VertexState.INITED || state == VertexState.RUNNING), 
             " Unexpected 1-1 split for vertex " + vertex.getVertexId() + 
             " in state " + vertex.getState() + 
             " . Split in vertex " + originalSplitSource + 
@@ -2601,27 +2607,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               " because of split in vertex " + originalSplitSource + 
               " sent by vertex " + splitEvent.getSenderVertex() +
               " numTasks " + splitEvent.getNumTasks());
-          return VertexState.INITED;
+          return state;
         }
         // cannot split from multiple sources
         throw new TezUncheckedException("Vertex: " + vertex.getVertexId() + 
             " asked to split by: " + originalSplitSource + 
             " but was already split by:" + vertex.originalOneToOneSplitSource);
       }
-      Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
-          " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
-              " in state " + vertex.getState() +
-              " . Split in vertex " + originalSplitSource +
-              " sent by vertex " + splitEvent.getSenderVertex() +
-              " numTasks " + splitEvent.getNumTasks());
+      
       LOG.info("Splitting vertex " + vertex.getVertexId() + 
           " because of split in vertex " + originalSplitSource + 
           " sent by vertex " + splitEvent.getSenderVertex() +
           " numTasks " + splitEvent.getNumTasks());
       vertex.originalOneToOneSplitSource = originalSplitSource;
-      // ZZZ Can this be handled ?
       vertex.setParallelism(splitEvent.getNumTasks(), null, null);
-      return vertex.initializeVertexInInitializingState();
+      if (vertex.getState() == VertexState.RUNNING) {
+        return VertexState.RUNNING;
+      } else {
+        Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
+            " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+                " in state " + vertex.getState() +
+                " . Split in vertex " + originalSplitSource +
+                " sent by vertex " + splitEvent.getSenderVertex() +
+                " numTasks " + splitEvent.getNumTasks());
+        return vertex.initializeVertexInInitializingState();        
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/80b91a4d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a813ebf..05c6569 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -125,6 +125,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.test.EdgeManagerForTest;
+import org.apache.tez.test.VertexManagerPluginForTest;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
@@ -280,7 +281,7 @@ public class TestVertexImpl {
     }
 
   }
-
+  
   private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
     @Override
@@ -473,44 +474,56 @@ public class TestVertexImpl {
     return dag;
   }
   
-  private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName) {
+  private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName, int numTasks) {
+    VertexPlan.Builder v1Builder = VertexPlan.newBuilder();
+    v1Builder.setName("vertex1")
+    .setType(PlanVertexType.NORMAL)
+    .addOutEdgeId("e1")
+    .addOutEdgeId("e2");
+    if (initializerClassName != null) {
+      numTasks = -1;
+      v1Builder.addInputs(
+          RootInputLeafOutputProto.newBuilder()
+          .setInitializerClassName(initializerClassName)
+          .setName("input1")
+          .setEntityDescriptor(
+              TezEntityDescriptorProto.newBuilder()
+                  .setClassName("InputClazz")
+                  .build()
+          ).build()
+      ).setTaskConfig(
+          PlanTaskConfiguration.newBuilder()
+          .setNumTasks(-1)
+          .setVirtualCores(4)
+          .setMemoryMb(1024)
+          .setJavaOpts("")
+          .setTaskModule("x1.y1")
+          .build()
+      );
+    } else {
+      v1Builder.setTaskConfig(
+          PlanTaskConfiguration.newBuilder()
+          .setNumTasks(numTasks)
+          .setVirtualCores(4)
+          .setMemoryMb(1024)
+          .setJavaOpts("")
+          .setTaskModule("x1.y1")
+          .build()
+      );
+    }
+    VertexPlan v1Plan = v1Builder.build();
+    
     LOG.info("Setting up one to one dag plan");
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("testVertexOneToOneSplit")
-        .addVertex(
-            VertexPlan.newBuilder()
-                .setName("vertex1")
-                .setType(PlanVertexType.NORMAL)
-                .addInputs(
-                    RootInputLeafOutputProto.newBuilder()
-                    .setInitializerClassName(initializerClassName)
-                    .setName("input1")
-                    .setEntityDescriptor(
-                        TezEntityDescriptorProto.newBuilder()
-                            .setClassName("InputClazz")
-                            .build()
-                    ).build()
-                )
-                .setTaskConfig(
-                    PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(-1)
-                    .setVirtualCores(4)
-                    .setMemoryMb(1024)
-                    .setJavaOpts("")
-                    .setTaskModule("x1.y1")
-                    .build()
-                )
-                .addOutEdgeId("e1")
-                .addOutEdgeId("e2")
-            .build()
-        )
+        .addVertex(v1Plan)
         .addVertex(
             VertexPlan.newBuilder()
                 .setName("vertex2")
                 .setType(PlanVertexType.NORMAL)
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(-1)
+                    .setNumTasks(numTasks)
                     .setVirtualCores(4)
                     .setMemoryMb(1024)
                     .setJavaOpts("")
@@ -527,7 +540,7 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(-1)
+                    .setNumTasks(numTasks)
                     .setVirtualCores(4)
                     .setMemoryMb(1024)
                     .setJavaOpts("")
@@ -544,7 +557,7 @@ public class TestVertexImpl {
                 .setType(PlanVertexType.NORMAL)
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(-1)
+                    .setNumTasks(numTasks)
                     .setVirtualCores(4)
                     .setMemoryMb(1024)
                     .setJavaOpts("")
@@ -2164,7 +2177,7 @@ public class TestVertexImpl {
     // vertex with 2 incoming splits from the same source should split once
     useCustomInitializer = true;
     setupPreDagCreation();
-    dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer");
+    dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer", -1);
     setupPostDagCreation();
     initAllVertices(VertexState.INITIALIZING);
     
@@ -2198,6 +2211,42 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
   }
+  
+  @Test(timeout = 5000)
+  public void testVertexWithOneToOneSplitWhileRunning() {
+    int numTasks = 5;
+    // create a diamond shaped dag with 1-1 edges. 
+    setupPreDagCreation();
+    dagPlan = createDAGPlanForOneToOneSplit(null, numTasks);
+    setupPostDagCreation();
+    VertexImpl v1 = vertices.get("vertex1");
+    initAllVertices(VertexState.INITED);
+    
+    // fudge vertex manager so that tasks dont start running
+    v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(),
+        v1, appContext);
+    startVertex(v1);
+    
+    Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
+    Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
+    Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+    System.out.println("xxx");
+    // change parallelism
+    int newNumTasks = 3;
+    v1.setParallelism(newNumTasks, null, null);
+    dispatcher.await();
+    Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks());
+    Assert.assertEquals(newNumTasks, vertices.get("vertex3").getTotalTasks());
+    Assert.assertEquals(newNumTasks, vertices.get("vertex4").getTotalTasks());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+  }
 
   @Test(timeout = 5000)
   public void testHistoryEventGeneration() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/80b91a4d/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
new file mode 100644
index 0000000..323fd08
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+public class VertexManagerPluginForTest implements VertexManagerPlugin {
+  @Override
+  public void initialize(VertexManagerPluginContext context) {}
+
+  @Override
+  public void onVertexStarted(Map<String, List<Integer>> completions) {}
+
+  @Override
+  public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {}
+
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
+
+  @Override
+  public void onRootVertexInitialized(String inputName,
+      InputDescriptor inputDescriptor, List<Event> events) {}
+}