You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/22 19:56:40 UTC

git commit: TEZ-639. Make sure Input initializers are called if they're specified, irrespective of parallelism. (sseth)

Updated Branches:
  refs/heads/master 8a8ba651b -> 2f25ef6b8


TEZ-639. Make sure Input initializers are called if they're specified,
irrespective of parallelism. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2f25ef6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2f25ef6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2f25ef6b

Branch: refs/heads/master
Commit: 2f25ef6b81650dff45849fd086d5265830863141
Parents: 8a8ba65
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Nov 22 10:56:02 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Nov 22 10:56:35 2013 -0800

----------------------------------------------------------------------
 .../app/dag/impl/RootInputVertexManager.java    |  2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 33 +++++++++++++++-----
 .../org/apache/tez/mapreduce/input/MRInput.java |  2 +-
 3 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f25ef6b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 0661b7c..0d1aabb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -45,7 +45,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 public class RootInputVertexManager implements VertexScheduler {
-
+  
   private final Vertex managedVertex;
   private final EventMetaData sourceInfo;
   private final Map<String, EventMetaData> destInfoMap;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f25ef6b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3eadded..179ed1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -464,6 +464,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final String javaOpts;
   private final ContainerContext containerContext;
   private VertexTerminationCause terminationCause;
+  
+  private String logIdentifier;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration conf, EventHandler eventHandler,
@@ -528,6 +530,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       setAdditionalOutputs(vertexPlan.getOutputsList());
     }
 
+    logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -1287,15 +1290,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // setup vertex scheduler
         // TODO this needs to consider data size and perhaps API.
         // Currently implicitly BIPARTITE is the only edge type
-        LOG.info("Setting vertexManager to ShuffleVertexManager");
+        LOG.info("Setting vertexManager to ShuffleVertexManager for " + vertex.logIdentifier);
         vertex.vertexScheduler = new ShuffleVertexManager(vertex);
       } else if (vertex.inputsWithInitializers != null) {
-        LOG.info("Setting vertexManager to RootInputVertexManager");
+        LOG.info("Setting vertexManager to RootInputVertexManager for " + vertex.logIdentifier);
         vertex.vertexScheduler = new RootInputVertexManager(vertex,
             vertex.eventHandler);
       } else {
         // schedule all tasks upon vertex start
-        LOG.info("Setting vertexManager to ImmediateStartVertexManager");
+        LOG.info("Setting vertexManager to ImmediateStartVertexManager for " + vertex.logIdentifier);
         vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
       }
       vertex.vertexScheduler.initialize(vertex.conf);
@@ -1369,11 +1372,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 
         return VertexState.INITIALIZING;
       } else {
-        vertex.createTasks();
+        if (vertex.inputsWithInitializers != null) {
+          vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
+              vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
+              vertex.eventHandler, vertex.getTotalTasks());
+          List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
+              .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
+          for (String inputName : vertex.inputsWithInitializers) {
+            inputList.add(vertex.additionalInputs.get(inputName));
+          }
+          LOG.info("Starting root input initializers: "
+              + vertex.inputsWithInitializers.size());
+          vertex.rootInputInitializer.runInputInitializers(inputList);
+          vertex.createTasks();
+          return VertexState.INITIALIZING;
+        } else {
+          vertex.createTasks();
+          return vertex.initializeVertex();
+        }
       }
-
-
-      return vertex.initializeVertex();
     }
   } // end of InitTransition
 
@@ -1727,7 +1744,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       boolean forceTransitionToKillWait = false;
       vertex.completedTaskCount++;
-      LOG.info("Num completed Tasks for " + vertex.getVertexId() + " [" + vertex.getName() + "] : "
+      LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
           + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
       Task task = vertex.tasks.get(taskEvent.getTaskID());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f25ef6b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 3c6f46b..5db41d9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -123,7 +123,7 @@ public class MRInput implements LogicalInput {
     MRInputUserPayloadProto mrUserPayload =
       MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
-        "All split information not expected in MRInput");
+        "Split information not expected in MRInput");
     Configuration conf =
       MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
     this.jobConf = new JobConf(conf);