You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/22 19:56:40 UTC
git commit: TEZ-639. Make sure Input initializers are called if
they're specified, irrespective of parallelism. (sseth)
Updated Branches:
refs/heads/master 8a8ba651b -> 2f25ef6b8
TEZ-639. Make sure Input initializers are called if they're specified,
irrespective of parallelism. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2f25ef6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2f25ef6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2f25ef6b
Branch: refs/heads/master
Commit: 2f25ef6b81650dff45849fd086d5265830863141
Parents: 8a8ba65
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Nov 22 10:56:02 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Nov 22 10:56:35 2013 -0800
----------------------------------------------------------------------
.../app/dag/impl/RootInputVertexManager.java | 2 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 33 +++++++++++++++-----
.../org/apache/tez/mapreduce/input/MRInput.java | 2 +-
3 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f25ef6b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 0661b7c..0d1aabb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -45,7 +45,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
public class RootInputVertexManager implements VertexScheduler {
-
+
private final Vertex managedVertex;
private final EventMetaData sourceInfo;
private final Map<String, EventMetaData> destInfoMap;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f25ef6b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3eadded..179ed1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -464,6 +464,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private final String javaOpts;
private final ContainerContext containerContext;
private VertexTerminationCause terminationCause;
+
+ private String logIdentifier;
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, Configuration conf, EventHandler eventHandler,
@@ -528,6 +530,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
setAdditionalOutputs(vertexPlan.getOutputsList());
}
+ logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
@@ -1287,15 +1290,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// setup vertex scheduler
// TODO this needs to consider data size and perhaps API.
// Currently implicitly BIPARTITE is the only edge type
- LOG.info("Setting vertexManager to ShuffleVertexManager");
+ LOG.info("Setting vertexManager to ShuffleVertexManager for " + vertex.logIdentifier);
vertex.vertexScheduler = new ShuffleVertexManager(vertex);
} else if (vertex.inputsWithInitializers != null) {
- LOG.info("Setting vertexManager to RootInputVertexManager");
+ LOG.info("Setting vertexManager to RootInputVertexManager for " + vertex.logIdentifier);
vertex.vertexScheduler = new RootInputVertexManager(vertex,
vertex.eventHandler);
} else {
// schedule all tasks upon vertex start
- LOG.info("Setting vertexManager to ImmediateStartVertexManager");
+ LOG.info("Setting vertexManager to ImmediateStartVertexManager for " + vertex.logIdentifier);
vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
}
vertex.vertexScheduler.initialize(vertex.conf);
@@ -1369,11 +1372,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return VertexState.INITIALIZING;
} else {
- vertex.createTasks();
+ if (vertex.inputsWithInitializers != null) {
+ vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
+ vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
+ vertex.eventHandler, vertex.getTotalTasks());
+ List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
+ .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
+ for (String inputName : vertex.inputsWithInitializers) {
+ inputList.add(vertex.additionalInputs.get(inputName));
+ }
+ LOG.info("Starting root input initializers: "
+ + vertex.inputsWithInitializers.size());
+ vertex.rootInputInitializer.runInputInitializers(inputList);
+ vertex.createTasks();
+ return VertexState.INITIALIZING;
+ } else {
+ vertex.createTasks();
+ return vertex.initializeVertex();
+ }
}
-
-
- return vertex.initializeVertex();
}
} // end of InitTransition
@@ -1727,7 +1744,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public VertexState transition(VertexImpl vertex, VertexEvent event) {
boolean forceTransitionToKillWait = false;
vertex.completedTaskCount++;
- LOG.info("Num completed Tasks for " + vertex.getVertexId() + " [" + vertex.getName() + "] : "
+ LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
+ vertex.completedTaskCount);
VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
Task task = vertex.tasks.get(taskEvent.getTaskID());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f25ef6b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 3c6f46b..5db41d9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -123,7 +123,7 @@ public class MRInput implements LogicalInput {
MRInputUserPayloadProto mrUserPayload =
MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
- "All split information not expected in MRInput");
+ "Split information not expected in MRInput");
Configuration conf =
MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);