You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/25 23:53:59 UTC
git commit: TEZ-1592. Vertex should wait for all initializers to
finish before moving to INITED state (sseth via bikas)
Repository: tez
Updated Branches:
refs/heads/master 51c8a8b71 -> b6790520d
TEZ-1592. Vertex should wait for all initializers to finish before moving to INITED state (sseth via bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b6790520
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b6790520
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b6790520
Branch: refs/heads/master
Commit: b6790520ddccb499e590efd3b567751e405a833a
Parents: 51c8a8b
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 25 14:53:44 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 25 14:53:44 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 70 +++++-------
.../tez/dag/app/dag/impl/TestVertexImpl.java | 106 +++++++++++++++++++
3 files changed, 137 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d7fa961..f790290 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,6 +28,8 @@ ALL CHANGES:
TEZ-1240. Add system test for propagation of diagnostics for errors
TEZ-1618. LocalTaskSchedulerService.getTotalResources() and getAvailableResources() can get negative if JVM memory is larger than 2GB
TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials
+ TEZ-1592. Vertex should wait for all initializers to finish before moving to
+ INITED state
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ab22099..34fffd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2670,25 +2670,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ " to set #tasks for the vertex " + vertex.getVertexId());
if (vertex.inputsWithInitializers != null) {
- // Use DAGScheduler to arbitrate resources among vertices later
- vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
- vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
- vertex.eventHandler, -1,
- vertex.appContext.getTaskScheduler().getNumClusterNodes(),
- vertex.getTaskResource(),
- vertex.appContext.getTaskScheduler().getTotalResources());
- List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
- inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
- for (String inputName : vertex.inputsWithInitializers) {
- inputList.add(vertex.rootInputDescriptors.get(inputName));
- }
- LOG.info("Vertex will initialize via inputInitializers "
- + vertex.logIdentifier + ". Starting root input initializers: "
- + vertex.inputsWithInitializers.size());
- vertex.rootInputInitializerManager.runInputInitializers(inputList);
- // Send pending rootInputInitializerEvents
- vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
- vertex.pendingInitializerEvents.clear();
+ vertex.setupInputInitializerManager();
return VertexState.INITIALIZING;
} else {
boolean hasOneToOneUninitedSource = false;
@@ -2716,27 +2698,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} else {
LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
vertex.createTasks();
+
if (vertex.inputsWithInitializers != null) {
- vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
- vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
- vertex.eventHandler, vertex.getTotalTasks(),
- vertex.appContext.getTaskScheduler().getNumClusterNodes(),
- vertex.getTaskResource(),
- vertex.appContext.getTaskScheduler().getTotalResources());
- List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
- inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
- for (String inputName : vertex.inputsWithInitializers) {
- inputList.add(vertex.rootInputDescriptors.get(inputName));
- }
- LOG.info("Starting root input initializers: "
- + vertex.inputsWithInitializers.size());
- // special case when numTasks>0 and still we want to stay in initializing
- // state. This is handled in RootInputInitializedTransition specially.
- vertex.initWaitsForRootInitializers = true;
- vertex.rootInputInitializerManager.runInputInitializers(inputList);
- // Send pending rootInputInitializerEvents
- vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents);
- vertex.pendingInitializerEvents.clear();
+ vertex.setupInputInitializerManager();
return VertexState.INITIALIZING;
}
if (!vertex.uninitializedEdges.isEmpty()) {
@@ -2832,8 +2796,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// done. check if we need to do the initialization
if (vertex.getState() == VertexState.INITIALIZING &&
vertex.initWaitsForRootInitializers) {
- // set the wait flag to false
- vertex.initWaitsForRootInitializers = false;
+ if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
+ // set the wait flag to false if all initializers are done
+ vertex.initWaitsForRootInitializers = false;
+ }
// initialize vertex if possible and needed
if (vertex.canInitVertex()) {
Preconditions.checkState(vertex.numTasks >= 0,
@@ -3572,6 +3538,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ private void setupInputInitializerManager() {
+ rootInputInitializerManager = createRootInputInitializerManager(
+ getDAG().getName(), getName(), getVertexId(),
+ eventHandler, getTotalTasks(),
+ appContext.getTaskScheduler().getNumClusterNodes(),
+ getTaskResource(),
+ appContext.getTaskScheduler().getTotalResources());
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size());
+ for (String inputName : inputsWithInitializers) {
+ inputList.add(rootInputDescriptors.get(inputName));
+ }
+ LOG.info("Vertex will initialize via inputInitializers "
+ + logIdentifier + ". Starting root input initializers: "
+ + inputsWithInitializers.size());
+ initWaitsForRootInitializers = true;
+ rootInputInitializerManager.runInputInitializers(inputList);
+ // Send pending rootInputInitializerEvents
+ rootInputInitializerManager.handleInitializerEvents(pendingInitializerEvents);
+ pendingInitializerEvents.clear();
+ }
+
private static class VertexStateChangedCallback
implements OnStateChangedCallback<VertexState, VertexImpl> {
http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a1b9847..e71acb6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -496,6 +496,52 @@ public class TestVertexImpl {
return dag;
}
+ private DAGPlan createDAGPlanWithMultipleInitializers(String initializerClassName) {
+ LOG.info("Setting up dag plan with multiple input initializer");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("testVertexWithMultipleInitializers")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ initializerClassName))
+ .setName("input1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ ).build()
+ )
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ initializerClassName))
+ .setName("input2")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ ).build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .build()
+ ).build();
+ return dag;
+ }
+
private DAGPlan createDAGPlanWithInputInitializer(String initializerClassName) {
LOG.info("Setting up dag plan with input initializer");
DAGPlan dag = DAGPlan.newBuilder()
@@ -3969,6 +4015,60 @@ public class TestVertexImpl {
return dag;
}
+ @Test(timeout = 5000)
+ public void testVertexWithMultipleInitializers1() {
+ useCustomInitializer = true;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
+ setupPostDagCreation();
+
+ VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
+ .get("vertex1");
+
+ dispatcher.getEventHandler().handle(
+ new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+ RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
+ List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
+
+ // Complete initializer which sets parallelism first
+ initializerManager1.completeInputInitialization(0, 5, v1Hints);
+ Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+ // Complete second initializer
+ initializerManager1.completeInputInitialization(1);
+ Assert.assertEquals(VertexState.INITED, v1.getState());
+ }
+
+ @Test(timeout = 5000)
+ public void testVertexWithMultipleInitializers2() {
+ useCustomInitializer = true;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
+ setupPostDagCreation();
+
+ VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
+ .get("vertex1");
+
+ dispatcher.getEventHandler().handle(
+ new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+ RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
+ List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
+
+ // Complete initializer which does not set parallelism
+ initializerManager1.completeInputInitialization(1);
+ Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+ // Complete second initializer which sets parallelism
+ initializerManager1.completeInputInitialization(0, 5, v1Hints);
+ Assert.assertEquals(VertexState.INITED, v1.getState());
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexWithInitializerSuccess() {
@@ -4363,6 +4463,12 @@ public class TestVertexImpl {
dispatcher.await();
}
+ public void completeInputInitialization(int initializerIndex) {
+ eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
+ .get(initializerIndex).getName(), null));
+ dispatcher.await();
+ }
+
public void completeInputInitialization(int initializerIndex, int targetTasks,
List<TaskLocationHint> locationHints) {
List<Event> events = Lists.newArrayListWithCapacity(targetTasks + 1);