You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/12/20 19:59:07 UTC
git commit: TEZ-683. Diamond shape DAG fail. (hitesh)
Updated Branches:
refs/heads/master 27fd81e4f -> 97e9a5ef3
TEZ-683. Diamond shape DAG fail. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/97e9a5ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/97e9a5ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/97e9a5ef
Branch: refs/heads/master
Commit: 97e9a5ef36f94b6620ee91d961b950745470d5b4
Parents: 27fd81e
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Dec 20 10:58:26 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Dec 20 10:58:26 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 10 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 352 +++++++++++++++++--
2 files changed, 338 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97e9a5ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5f89df7..6344652 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -211,6 +211,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.INITIALIZING, VertexState.FAILED),
VertexEventType.V_INIT,
new InitTransition())
+ .addTransition(VertexState.NEW, VertexState.NEW,
+ VertexEventType.V_SOURCE_VERTEX_STARTED,
+ new SourceVertexStartedTransition())
.addTransition(VertexState.NEW, VertexState.KILLED,
VertexEventType.V_TERMINATE,
new TerminateNewVertexTransition())
@@ -322,6 +325,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
@@ -358,6 +362,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.FAILED, VertexState.FAILED,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_START,
VertexEventType.V_ROUTE_EVENT,
@@ -956,8 +961,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
if (oldState != getInternalState()) {
- LOG.info(vertexId + " transitioned from " + oldState + " to "
- + getInternalState());
+ LOG.info(logIdentifier + " transitioned from " + oldState + " to "
+ + getInternalState() + " due to event "
+ + event.getType());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/97e9a5ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 78c7dcc..61c9745 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -18,8 +18,10 @@
package org.apache.tez.dag.app.dag.impl;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
@@ -76,6 +78,8 @@ import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -104,6 +108,8 @@ import org.junit.Test;
import org.mockito.internal.util.collections.Sets;
import com.google.common.collect.Lists;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestVertexImpl {
@@ -123,13 +129,13 @@ public class TestVertexImpl {
private Clock clock = new SystemClock();
private TaskHeartbeatHandler thh;
private AppContext appContext;
- private VertexLocationHint vertexLocationHint;
+ private VertexLocationHint vertexLocationHint = null;
private Configuration conf;
private Map<String, Edge> edges;
+ private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
-
private DagEventDispatcher dagEventDispatcher;
private class CountingVertexOutputCommitter extends
@@ -190,6 +196,18 @@ public class TestVertexImpl {
}
}
+ private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ VertexImpl vertex = vertexIdMap.get(
+ event.getTaskAttemptID().getTaskID().getVertexID());
+ Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
+ ((EventHandler<TaskAttemptEvent>)task.getAttempt(
+ event.getTaskAttemptID())).handle(event);
+ }
+ }
+
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@SuppressWarnings("unchecked")
@Override
@@ -295,25 +313,26 @@ public class TestVertexImpl {
.setType(PlanVertexType.NORMAL)
.addInputs(
RootInputLeafOutputProto.newBuilder()
- .setInitializerClassName(initializerClassName)
- .setName("input2")
- .setEntityDescriptor(
- TezEntityDescriptorProto.newBuilder()
- .setClassName("InputClazz")
- .build()
- ).build()
- )
+ .setInitializerClassName(initializerClassName)
+ .setName("input2")
+ .setEntityDescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ )
+ .build()
+ )
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
- .setNumTasks(-1)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x2.y2")
- .build()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
)
.addInEdgeId("e1")
- .build()
+ .build()
)
.addEdge(
EdgePlan.newBuilder()
@@ -327,7 +346,7 @@ public class TestVertexImpl {
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
- .build();
+ .build();
return dag;
}
@@ -692,6 +711,230 @@ public class TestVertexImpl {
return dag;
}
+ // Create a plan with 3 vertices: A, B, C
+ // A -> B, A -> C, B -> C
+ private DAGPlan createSamplerDAGPlan() {
+ LOG.info("Setting up dag plan");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("TestSamplerDAG")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("A")
+ .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host1")
+ .addRack("rack1")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("A.class")
+ .build()
+ )
+ .addOutEdgeId("A_B")
+ .addOutEdgeId("A_C")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("B")
+ .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host2")
+ .addRack("rack2")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("")
+ .build()
+ )
+ .addInEdgeId("A_B")
+ .addOutEdgeId("B_C")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("C")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host3")
+ .addRack("rack3")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("foo")
+ .setTaskModule("x3.y3")
+ .build()
+ )
+ .addInEdgeId("A_C")
+ .addInEdgeId("B_C")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class"))
+ .setInputVertexName("A")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class"))
+ .setOutputVertexName("B")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("A_B")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_C"))
+ .setInputVertexName("A")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_C.class"))
+ .setOutputVertexName("C")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("A_C")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+ .setInputVertexName("B")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+ .setOutputVertexName("C")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("B_C")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+
+ return dag;
+ }
+
+ // Create a plan with 3 vertices: A, B, C
+ // A -> C, B -> C
+ private DAGPlan createSamplerDAGPlan2() {
+ LOG.info("Setting up dag plan");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("TestSamplerDAG")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("A")
+ .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host1")
+ .addRack("rack1")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("A.class")
+ .build()
+ )
+ .addOutEdgeId("A_C")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("B")
+ .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host2")
+ .addRack("rack2")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("")
+ .build()
+ )
+ .addOutEdgeId("B_C")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("C")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host3")
+ .addRack("rack3")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("foo")
+ .setTaskModule("x3.y3")
+ .build()
+ )
+ .addInEdgeId("A_C")
+ .addInEdgeId("B_C")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_C"))
+ .setInputVertexName("A")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_C.class"))
+ .setOutputVertexName("C")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("A_C")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+ .setInputVertexName("B")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+ .setOutputVertexName("C")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("B_C")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+
+ return dag;
+ }
+
private void setupVertices() {
int vCnt = dagPlan.getVertexCount();
LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
@@ -701,15 +944,17 @@ public class TestVertexImpl {
VertexPlan vPlan = dagPlan.getVertex(i);
String vName = vPlan.getName();
TezVertexID vertexId = TezVertexID.getInstance(dagId, i+1);
- VertexImpl v;
+ VertexImpl v = null;
+ VertexLocationHint locationHint = DagTypeConverters.convertFromDAGPlan(
+ vPlan.getTaskLocationHintList());
if (useCustomInitializer) {
v = new VertexImplWithCustomInitializer(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
- clock, thh, appContext, vertexLocationHint, dispatcher);
+ clock, thh, appContext, locationHint, dispatcher);
} else {
v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
- clock, thh, appContext, vertexLocationHint);
+ clock, thh, appContext, locationHint);
}
vertices.put(vName, v);
vertexIdMap.put(vertexId, v);
@@ -780,8 +1025,20 @@ public class TestVertexImpl {
doReturn(dagId).when(dag).getID();
doReturn(taskScheduler).when(appContext).getTaskScheduler();
doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
+
setupVertices();
-
+ when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
+ @Override
+ public Vertex answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ if (args.length != 1) {
+ return null;
+ }
+ TezVertexID vId = (TezVertexID) args[0];
+ return vertexIdMap.get(vId);
+ }
+ });
+
// TODO - this test logic is tightly linked to impl DAGImpl code.
edges = new HashMap<String, Edge>();
for (EdgePlan edgePlan : dagPlan.getEdgeList()) {
@@ -791,6 +1048,8 @@ public class TestVertexImpl {
}
parseVertexEdges();
+ taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
+ dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
taskEventDispatcher = new TaskEventDispatcher();
dispatcher.register(TaskEventType.class, taskEventDispatcher);
vertexEventDispatcher = new VertexEventDispatcher();
@@ -1746,4 +2005,53 @@ public class TestVertexImpl {
}
}
+ @Test(timeout=5000)
+ public void testInitStartRace() {
+ // Race when a source vertex manages to start before the target vertex has
+ // been initialized
+ setupPreDagCreation();
+ dagPlan = createSamplerDAGPlan();
+ setupPostDagCreation();
+
+ VertexImpl vA = vertices.get("A");
+ VertexImpl vB = vertices.get("B");
+ VertexImpl vC = vertices.get("C");
+
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_START));
+
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, vA.getState());
+ Assert.assertEquals(VertexState.RUNNING, vB.getState());
+ Assert.assertEquals(VertexState.RUNNING, vC.getState());
+ }
+
+ @Test(timeout=5000)
+ public void testInitStartRace2() {
+ // Race when a source vertex manages to start before the target vertex has
+ // been initialized
+ setupPreDagCreation();
+ dagPlan = createSamplerDAGPlan2();
+ setupPostDagCreation();
+
+ VertexImpl vA = vertices.get("A");
+ VertexImpl vB = vertices.get("B");
+ VertexImpl vC = vertices.get("C");
+
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_START));
+ dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(),
+ VertexEventType.V_START));
+
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, vA.getState());
+ Assert.assertEquals(VertexState.RUNNING, vB.getState());
+ Assert.assertEquals(VertexState.RUNNING, vC.getState());
+ }
}