You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/29 00:01:47 UTC
git commit: TEZ-1143. 1-1 source split event should be handled in
Vertex.RUNNING state (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master b3a9ec0e7 -> 80b91a4d8
TEZ-1143. 1-1 source split event should be handled in Vertex.RUNNING state (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/80b91a4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/80b91a4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/80b91a4d
Branch: refs/heads/master
Commit: 80b91a4d8ccb211b50901cfc43dc3cfad991ec3f
Parents: b3a9ec0
Author: Bikas Saha <bi...@apache.org>
Authored: Wed May 28 15:00:59 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed May 28 15:01:36 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 36 +++---
.../tez/dag/app/dag/impl/TestVertexImpl.java | 115 +++++++++++++------
.../tez/test/VertexManagerPluginForTest.java | 46 ++++++++
3 files changed, 151 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/80b91a4d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 17257ea..1cf63cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -362,6 +362,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.ERROR),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedTransition())
+ .addTransition(VertexState.RUNNING,
+ EnumSet.of(VertexState.RUNNING),
+ VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
+ new OneToOneSourceSplitTransition())
.addTransition(VertexState.RUNNING, VertexState.TERMINATING,
VertexEventType.V_TERMINATE,
new VertexKilledTransition())
@@ -539,7 +543,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private RootInputInitializerRunner rootInputInitializer;
- private VertexManager vertexManager;
+ VertexManager vertexManager;
private final UserGroupInformation dagUgi;
@@ -1155,8 +1159,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
edge.startEventBuffering();
}
- LOG.info("Vertex " + getVertexId() +
- " parallelism set to " + parallelism + " from " + numTasks);
// assign to local variable of LinkedHashMap to make sure that changing
// type of task causes compile error. We depend on LinkedHashMap for order
LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
@@ -1179,6 +1181,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Removing task: " + entry.getKey());
iter.remove();
}
+ LOG.info("Vertex " + logIdentifier +
+ " parallelism set to " + parallelism + " from " + numTasks);
this.numTasks = parallelism;
assert tasks.size() == numTasks;
@@ -2587,8 +2591,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventOneToOneSourceSplit splitEvent =
(VertexEventOneToOneSourceSplit)event;
TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
+
if (vertex.originalOneToOneSplitSource != null) {
- Preconditions.checkState(vertex.getState() == VertexState.INITED,
+ VertexState state = vertex.getState();
+ Preconditions.checkState((state == VertexState.INITED || state == VertexState.RUNNING),
" Unexpected 1-1 split for vertex " + vertex.getVertexId() +
" in state " + vertex.getState() +
" . Split in vertex " + originalSplitSource +
@@ -2601,27 +2607,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
" because of split in vertex " + originalSplitSource +
" sent by vertex " + splitEvent.getSenderVertex() +
" numTasks " + splitEvent.getNumTasks());
- return VertexState.INITED;
+ return state;
}
// cannot split from multiple sources
throw new TezUncheckedException("Vertex: " + vertex.getVertexId() +
" asked to split by: " + originalSplitSource +
" but was already split by:" + vertex.originalOneToOneSplitSource);
}
- Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
- " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
- " in state " + vertex.getState() +
- " . Split in vertex " + originalSplitSource +
- " sent by vertex " + splitEvent.getSenderVertex() +
- " numTasks " + splitEvent.getNumTasks());
+
LOG.info("Splitting vertex " + vertex.getVertexId() +
" because of split in vertex " + originalSplitSource +
" sent by vertex " + splitEvent.getSenderVertex() +
" numTasks " + splitEvent.getNumTasks());
vertex.originalOneToOneSplitSource = originalSplitSource;
- // ZZZ Can this be handled ?
vertex.setParallelism(splitEvent.getNumTasks(), null, null);
- return vertex.initializeVertexInInitializingState();
+ if (vertex.getState() == VertexState.RUNNING) {
+ return VertexState.RUNNING;
+ } else {
+ Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
+ " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+ " in state " + vertex.getState() +
+ " . Split in vertex " + originalSplitSource +
+ " sent by vertex " + splitEvent.getSenderVertex() +
+ " numTasks " + splitEvent.getNumTasks());
+ return vertex.initializeVertexInInitializingState();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/80b91a4d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a813ebf..05c6569 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -125,6 +125,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.test.EdgeManagerForTest;
+import org.apache.tez.test.VertexManagerPluginForTest;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
@@ -280,7 +281,7 @@ public class TestVertexImpl {
}
}
-
+
private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
@@ -473,44 +474,56 @@ public class TestVertexImpl {
return dag;
}
- private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName) {
+ private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName, int numTasks) {
+ VertexPlan.Builder v1Builder = VertexPlan.newBuilder();
+ v1Builder.setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addOutEdgeId("e1")
+ .addOutEdgeId("e2");
+ if (initializerClassName != null) {
+ numTasks = -1;
+ v1Builder.addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setInitializerClassName(initializerClassName)
+ .setName("input1")
+ .setEntityDescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ ).build()
+ ).setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ );
+ } else {
+ v1Builder.setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(numTasks)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ );
+ }
+ VertexPlan v1Plan = v1Builder.build();
+
LOG.info("Setting up one to one dag plan");
DAGPlan dag = DAGPlan.newBuilder()
.setName("testVertexOneToOneSplit")
- .addVertex(
- VertexPlan.newBuilder()
- .setName("vertex1")
- .setType(PlanVertexType.NORMAL)
- .addInputs(
- RootInputLeafOutputProto.newBuilder()
- .setInitializerClassName(initializerClassName)
- .setName("input1")
- .setEntityDescriptor(
- TezEntityDescriptorProto.newBuilder()
- .setClassName("InputClazz")
- .build()
- ).build()
- )
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(-1)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x1.y1")
- .build()
- )
- .addOutEdgeId("e1")
- .addOutEdgeId("e2")
- .build()
- )
+ .addVertex(v1Plan)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex2")
.setType(PlanVertexType.NORMAL)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
- .setNumTasks(-1)
+ .setNumTasks(numTasks)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
@@ -527,7 +540,7 @@ public class TestVertexImpl {
.setType(PlanVertexType.NORMAL)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
- .setNumTasks(-1)
+ .setNumTasks(numTasks)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
@@ -544,7 +557,7 @@ public class TestVertexImpl {
.setType(PlanVertexType.NORMAL)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
- .setNumTasks(-1)
+ .setNumTasks(numTasks)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
@@ -2164,7 +2177,7 @@ public class TestVertexImpl {
// vertex with 2 incoming splits from the same source should split once
useCustomInitializer = true;
setupPreDagCreation();
- dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer");
+ dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer", -1);
setupPostDagCreation();
initAllVertices(VertexState.INITIALIZING);
@@ -2198,6 +2211,42 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
}
+
+ @Test(timeout = 5000)
+ public void testVertexWithOneToOneSplitWhileRunning() {
+ int numTasks = 5;
+ // create a diamond shaped dag with 1-1 edges.
+ setupPreDagCreation();
+ dagPlan = createDAGPlanForOneToOneSplit(null, numTasks);
+ setupPostDagCreation();
+ VertexImpl v1 = vertices.get("vertex1");
+ initAllVertices(VertexState.INITED);
+
+ // fudge vertex manager so that tasks dont start running
+ v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(),
+ v1, appContext);
+ startVertex(v1);
+
+ Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
+ Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
+ Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+ System.out.println("xxx");
+ // change parallelism
+ int newNumTasks = 3;
+ v1.setParallelism(newNumTasks, null, null);
+ dispatcher.await();
+ Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks());
+ Assert.assertEquals(newNumTasks, vertices.get("vertex3").getTotalTasks());
+ Assert.assertEquals(newNumTasks, vertices.get("vertex4").getTotalTasks());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+ }
@Test(timeout = 5000)
public void testHistoryEventGeneration() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/80b91a4d/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
new file mode 100644
index 0000000..323fd08
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+public class VertexManagerPluginForTest implements VertexManagerPlugin {
+ @Override
+ public void initialize(VertexManagerPluginContext context) {}
+
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions) {}
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {}
+
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
+
+ @Override
+ public void onRootVertexInitialized(String inputName,
+ InputDescriptor inputDescriptor, List<Event> events) {}
+}