You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:49 UTC

[43/50] [abbrv] git commit: TEZ-1592. Vertex should wait for all initializers to finish before moving to INITED state (sseth via bikas)

TEZ-1592. Vertex should wait for all initializers to finish before moving to INITED state (sseth via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b6790520
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b6790520
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b6790520

Branch: refs/heads/branch-0.5
Commit: b6790520ddccb499e590efd3b567751e405a833a
Parents: 51c8a8b
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 25 14:53:44 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 25 14:53:44 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  70 +++++-------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 106 +++++++++++++++++++
 3 files changed, 137 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d7fa961..f790290 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,6 +28,8 @@ ALL CHANGES:
   TEZ-1240. Add system test for propagation of diagnostics for errors
   TEZ-1618. LocalTaskSchedulerService.getTotalResources() and getAvailableResources() can get negative if JVM memory is larger than 2GB
   TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials
+  TEZ-1592. Vertex should wait for all initializers to finish before moving to
+  INITED state
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ab22099..34fffd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2670,25 +2670,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             + " to set #tasks for the vertex " + vertex.getVertexId());
 
         if (vertex.inputsWithInitializers != null) {
-          // Use DAGScheduler to arbitrate resources among vertices later
-          vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
-              vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
-              vertex.eventHandler, -1,
-              vertex.appContext.getTaskScheduler().getNumClusterNodes(),
-              vertex.getTaskResource(),
-              vertex.appContext.getTaskScheduler().getTotalResources());
-          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
-              inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
-          for (String inputName : vertex.inputsWithInitializers) {
-            inputList.add(vertex.rootInputDescriptors.get(inputName));
-          }
-          LOG.info("Vertex will initialize via inputInitializers "
-              + vertex.logIdentifier + ". Starting root input initializers: "
-              + vertex.inputsWithInitializers.size());
-          vertex.rootInputInitializerManager.runInputInitializers(inputList);
-          // Send pending rootInputInitializerEvents
-          vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
-          vertex.pendingInitializerEvents.clear();
+          vertex.setupInputInitializerManager();
           return VertexState.INITIALIZING;
         } else {
           boolean hasOneToOneUninitedSource = false;
@@ -2716,27 +2698,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       } else {
         LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
         vertex.createTasks();
+
         if (vertex.inputsWithInitializers != null) {
-          vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
-              vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
-              vertex.eventHandler, vertex.getTotalTasks(),
-              vertex.appContext.getTaskScheduler().getNumClusterNodes(),
-              vertex.getTaskResource(),
-              vertex.appContext.getTaskScheduler().getTotalResources());
-          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
-          inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
-          for (String inputName : vertex.inputsWithInitializers) {
-            inputList.add(vertex.rootInputDescriptors.get(inputName));
-          }
-          LOG.info("Starting root input initializers: "
-              + vertex.inputsWithInitializers.size());
-          // special case when numTasks>0 and still we want to stay in initializing
-          // state. This is handled in RootInputInitializedTransition specially.
-          vertex.initWaitsForRootInitializers = true;
-          vertex.rootInputInitializerManager.runInputInitializers(inputList);
-          // Send pending rootInputInitializerEvents
-          vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
-          vertex.pendingInitializerEvents.clear();
+          vertex.setupInputInitializerManager();
           return VertexState.INITIALIZING;
         }
         if (!vertex.uninitializedEdges.isEmpty()) {
@@ -2832,8 +2796,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       // done. check if we need to do the initialization
       if (vertex.getState() == VertexState.INITIALIZING &&
           vertex.initWaitsForRootInitializers) {
-        // set the wait flag to false
-        vertex.initWaitsForRootInitializers = false;
+        if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
+          // set the wait flag to false if all initializers are done
+          vertex.initWaitsForRootInitializers = false;
+        }
         // initialize vertex if possible and needed
         if (vertex.canInitVertex()) {
           Preconditions.checkState(vertex.numTasks >= 0,
@@ -3572,6 +3538,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private void setupInputInitializerManager() {
+    rootInputInitializerManager = createRootInputInitializerManager(
+        getDAG().getName(), getName(), getVertexId(),
+        eventHandler, getTotalTasks(),
+        appContext.getTaskScheduler().getNumClusterNodes(),
+        getTaskResource(),
+        appContext.getTaskScheduler().getTotalResources());
+    List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+        inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size());
+    for (String inputName : inputsWithInitializers) {
+      inputList.add(rootInputDescriptors.get(inputName));
+    }
+    LOG.info("Vertex will initialize via inputInitializers "
+        + logIdentifier + ". Starting root input initializers: "
+        + inputsWithInitializers.size());
+    initWaitsForRootInitializers = true;
+    rootInputInitializerManager.runInputInitializers(inputList);
+    // Send pending rootInputInitializerEvents
+    rootInputInitializerManager.handleInitializerEvents(pendingInitializerEvents);
+    pendingInitializerEvents.clear();
+  }
+
   private static class VertexStateChangedCallback
       implements OnStateChangedCallback<VertexState, VertexImpl> {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a1b9847..e71acb6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -496,6 +496,52 @@ public class TestVertexImpl {
     return dag;
   }
 
+  private DAGPlan createDAGPlanWithMultipleInitializers(String initializerClassName) {
+    LOG.info("Setting up dag plan with multiple input initializer");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testVertexWithMultipleInitializers")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
+                        .setName("input1")
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("InputClazz")
+                                .build()
+                        ).build()
+                )
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                initializerClassName))
+                        .setName("input2")
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("InputClazz")
+                                .build()
+                        ).build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(-1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .build()
+        ).build();
+    return dag;
+  }
+
   private DAGPlan createDAGPlanWithInputInitializer(String initializerClassName) {
     LOG.info("Setting up dag plan with input initializer");
     DAGPlan dag = DAGPlan.newBuilder()
@@ -3969,6 +4015,60 @@ public class TestVertexImpl {
     return dag;
   }
 
+  @Test(timeout = 5000)
+  public void testVertexWithMultipleInitializers1() {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
+    setupPostDagCreation();
+
+    VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
+        .get("vertex1");
+
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+    RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
+    List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
+
+    // Complete initializer which sets parallelism first
+    initializerManager1.completeInputInitialization(0, 5, v1Hints);
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+    // Complete second initializer
+    initializerManager1.completeInputInitialization(1);
+    Assert.assertEquals(VertexState.INITED, v1.getState());
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexWithMultipleInitializers2() {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
+    setupPostDagCreation();
+
+    VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
+        .get("vertex1");
+
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+    RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
+    List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
+
+    // Complete initializer which does not set parallelism
+    initializerManager1.completeInputInitialization(1);
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+    // Complete second initializer which sets parallelism
+    initializerManager1.completeInputInitialization(0, 5, v1Hints);
+    Assert.assertEquals(VertexState.INITED, v1.getState());
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexWithInitializerSuccess() {
@@ -4363,6 +4463,12 @@ public class TestVertexImpl {
       dispatcher.await();
     }
 
+    public void completeInputInitialization(int initializerIndex) {
+      eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
+          .get(initializerIndex).getName(), null));
+      dispatcher.await();
+    }
+
     public void completeInputInitialization(int initializerIndex, int targetTasks,
         List<TaskLocationHint> locationHints) {
       List<Event> events = Lists.newArrayListWithCapacity(targetTasks + 1);