You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:49:50 UTC
[05/25] git commit: TEZ-1494. DAG hangs waiting for
ShuffleManager.getNextInput() (Rajesh Balamohan)
TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput() (Rajesh Balamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fcc74261
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fcc74261
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fcc74261
Branch: refs/heads/TEZ-8
Commit: fcc74261a0d76d00580f4d0dca042a1ed014ccec
Parents: bf6ac4e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Sep 12 04:09:21 2014 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Sep 12 04:09:21 2014 +0530
----------------------------------------------------------------------
.../dag/impl/ImmediateStartVertexManager.java | 92 ++++-
.../app/dag/impl/RootInputVertexManager.java | 27 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 364 +++++++++++++++++--
3 files changed, 417 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fcc74261/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index b202d70..ac2b851 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -18,9 +18,11 @@
package org.apache.tez.dag.app.dag.impl;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -28,29 +30,101 @@ import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
/**
* Starts all tasks immediately on vertex start
*/
public class ImmediateStartVertexManager extends VertexManagerPlugin {
+ private static final Log LOG = LogFactory.getLog(ImmediateStartVertexManager.class);
+
+ private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
+ private int managedTasks;
+ private boolean tasksScheduled = false;
+
+ class SourceVertexInfo {
+ EdgeProperty edgeProperty;
+ int numFinishedTasks;
+
+ SourceVertexInfo(EdgeProperty edgeProperty) {
+ this.edgeProperty = edgeProperty;
+ }
+ }
+
public ImmediateStartVertexManager(VertexManagerPluginContext context) {
super(context);
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
- int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
- List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
- for (int i=0; i<numTasks; ++i) {
- scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
+ managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
+ Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+ for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
+ String srcVertex = entry.getKey();
+ EdgeProperty edgeProp = entry.getValue();
+ srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+ }
+
+ //handle completions
+ for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+ for (Integer task : entry.getValue()) {
+ handleSourceTaskFinished(entry.getKey(), task);
+ }
+ }
+ scheduleTasks();
+ }
+
+ private void handleSourceTaskFinished(String vertex, Integer taskId) {
+ SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
+ //Not mandatory to check for duplicate completions here
+ srcInfo.numFinishedTasks++;
+ }
+
+ private void scheduleTasks() {
+ if (!canScheduleTasks()) {
+ return;
+ }
+
+ List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
+ for (int i = 0; i < managedTasks; ++i) {
+ tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+ }
+
+ if (!tasksToStart.isEmpty()) {
+ LOG.info("Starting " + tasksToStart.size() + " in " + getContext().getVertexName());
+ getContext().scheduleVertexTasks(tasksToStart);
}
- getContext().scheduleVertexTasks(scheduledTasks);
+ tasksScheduled = true;
+ }
+
+ private boolean canScheduleTasks() {
+ //Check if at least 1 task is finished from each source vertex (in case of broadcast &
+ // one-to-one or custom)
+ for (Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
+ SourceVertexInfo srcVertexInfo = entry.getValue();
+ switch(srcVertexInfo.edgeProperty.getDataMovementType()) {
+ case ONE_TO_ONE:
+ case BROADCAST:
+ case CUSTOM:
+ if (srcVertexInfo.numFinishedTasks == 0) {
+ //do not schedule tasks until a task from source task is complete
+ return false;
+ }
+ default:
+ break;
+ }
+ }
+ return true;
}
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+ handleSourceTaskFinished(srcVertexName, attemptId);
+ if (!tasksScheduled) {
+ scheduleTasks();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/fcc74261/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index e6ffdc5..e850286 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -18,27 +18,23 @@
package org.apache.tez.dag.app.dag.impl;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class RootInputVertexManager extends VertexManagerPlugin {
+public class RootInputVertexManager extends ImmediateStartVertexManager {
private String configuredInputName;
@@ -46,27 +42,6 @@ public class RootInputVertexManager extends VertexManagerPlugin {
super(context);
}
- @Override
- public void initialize() {
- }
-
- @Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
- int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
- List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
- for (int i=0; i<numTasks; ++i) {
- scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
- }
- getContext().scheduleVertexTasks(scheduledTasks);
- }
-
- @Override
- public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
- }
-
- @Override
- public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
- }
@Override
public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
http://git-wip-us.apache.org/repos/asf/tez/blob/fcc74261/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index d894928..04e2219 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
import java.nio.ByteBuffer;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn;
@@ -1879,39 +1880,39 @@ public class TestVertexImpl {
Assert.assertEquals(2, v3.getOutputVerticesCount());
Assert.assertEquals(2, v3.getOutputVerticesCount());
- Assert.assertTrue("vertex1".equals(v3.getInputSpecList(0).get(0)
+ assertTrue("vertex1".equals(v3.getInputSpecList(0).get(0)
.getSourceVertexName())
|| "vertex2".equals(v3.getInputSpecList(0).get(0)
- .getSourceVertexName()));
- Assert.assertTrue("vertex1".equals(v3.getInputSpecList(0).get(1)
+ .getSourceVertexName()));
+ assertTrue("vertex1".equals(v3.getInputSpecList(0).get(1)
.getSourceVertexName())
|| "vertex2".equals(v3.getInputSpecList(0).get(1)
- .getSourceVertexName()));
- Assert.assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(0)
+ .getSourceVertexName()));
+ assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(0)
.getInputDescriptor().getClassName())
|| "i3_v2".equals(v3.getInputSpecList(0).get(0)
- .getInputDescriptor().getClassName()));
- Assert.assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(1)
+ .getInputDescriptor().getClassName()));
+ assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(1)
.getInputDescriptor().getClassName())
|| "i3_v2".equals(v3.getInputSpecList(0).get(1)
- .getInputDescriptor().getClassName()));
+ .getInputDescriptor().getClassName()));
- Assert.assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(0)
+ assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(0)
.getDestinationVertexName())
|| "vertex5".equals(v3.getOutputSpecList(0).get(0)
- .getDestinationVertexName()));
- Assert.assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(1)
+ .getDestinationVertexName()));
+ assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(1)
.getDestinationVertexName())
|| "vertex5".equals(v3.getOutputSpecList(0).get(1)
- .getDestinationVertexName()));
- Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(0)
+ .getDestinationVertexName()));
+ assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(0)
.getOutputDescriptor().getClassName())
|| "o3_v5".equals(v3.getOutputSpecList(0).get(0)
- .getOutputDescriptor().getClassName()));
- Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(1)
+ .getOutputDescriptor().getClassName()));
+ assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(1)
.getOutputDescriptor().getClassName())
|| "o3_v5".equals(v3.getOutputSpecList(0).get(1)
- .getOutputDescriptor().getClassName()));
+ .getOutputDescriptor().getClassName()));
}
@Test(timeout = 5000)
@@ -1940,13 +1941,13 @@ public class TestVertexImpl {
Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
Collections.singletonMap(
v1.getName(), mockEdgeManagerDescriptor);
- Assert.assertTrue(v3.setParallelism(1, null, edgeManagerDescriptors, null));
- Assert.assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
+ assertTrue(v3.setParallelism(1, null, edgeManagerDescriptors, null));
+ assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
EdgeManagerForTest);
Assert.assertEquals(1, v3.getTotalTasks());
Assert.assertEquals(1, tasks.size());
// the last one is removed
- Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
+ assertTrue(tasks.keySet().iterator().next().equals(firstTask));
}
@@ -1993,7 +1994,7 @@ public class TestVertexImpl {
Edge edge = edges.get("e4");
EdgeManagerPlugin em = edge.getEdgeManager();
EdgeManagerForTest originalEm = (EdgeManagerForTest) em;
- Assert.assertTrue(Arrays.equals(edgePayload, originalEm.getEdgeManagerContext()
+ assertTrue(Arrays.equals(edgePayload, originalEm.getEdgeManagerContext()
.getUserPayload().deepCopyAsArray()));
UserPayload userPayload = UserPayload.create(ByteBuffer.wrap(new String("foo").getBytes()));
@@ -2007,7 +2008,7 @@ public class TestVertexImpl {
Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
- Assert.assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null,
+ assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null,
edgeManagerDescriptors, null)); // Must decrease.
VertexImpl v5Impl = (VertexImpl) v5;
@@ -2015,10 +2016,10 @@ public class TestVertexImpl {
EdgeManagerPlugin modifiedEdgeManager = v5Impl.sourceVertices.get(v3)
.getEdgeManager();
Assert.assertNotNull(modifiedEdgeManager);
- Assert.assertTrue(modifiedEdgeManager instanceof EdgeManagerForTest);
+ assertTrue(modifiedEdgeManager instanceof EdgeManagerForTest);
// Ensure initialize() is called with the correct payload
- Assert.assertTrue(Arrays.equals(userPayload.deepCopyAsArray(),
+ assertTrue(Arrays.equals(userPayload.deepCopyAsArray(),
((EdgeManagerForTest) modifiedEdgeManager).getUserPayload().deepCopyAsArray()));
}
@@ -2092,7 +2093,7 @@ public class TestVertexImpl {
Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
String diagnostics =
StringUtils.join(v.getDiagnostics(), ",").toLowerCase();
- Assert.assertTrue(diagnostics.contains("task failed"
+ assertTrue(diagnostics.contains("task failed"
+ ", taskid=" + t1.toString()));
}
@@ -2104,7 +2105,7 @@ public class TestVertexImpl {
String diagnostics =
StringUtils.join(v2.getDiagnostics(), ",").toLowerCase();
LOG.info("diagnostics v2: " + diagnostics);
- Assert.assertTrue(diagnostics.contains(
+ assertTrue(diagnostics.contains(
"vertex received kill in inited state"));
}
@@ -2118,7 +2119,7 @@ public class TestVertexImpl {
killVertex(v3);
String diagnostics =
StringUtils.join(v3.getDiagnostics(), ",").toLowerCase();
- Assert.assertTrue(diagnostics.contains(
+ assertTrue(diagnostics.contains(
"vertex received kill while in running state"));
}
@@ -2200,7 +2201,7 @@ public class TestVertexImpl {
Assert.assertNull(v2.getOutputCommitter("output"));
VertexImpl v6 = vertices.get("vertex6");
- Assert.assertTrue(v6.getOutputCommitter("outputx")
+ assertTrue(v6.getOutputCommitter("outputx")
instanceof CountingOutputCommitter);
}
@@ -2208,11 +2209,11 @@ public class TestVertexImpl {
public void testVertexManagerInit() {
initAllVertices(VertexState.INITED);
VertexImpl v2 = vertices.get("vertex2");
- Assert.assertTrue(v2.getVertexManager().getPlugin()
+ assertTrue(v2.getVertexManager().getPlugin()
instanceof ImmediateStartVertexManager);
VertexImpl v6 = vertices.get("vertex6");
- Assert.assertTrue(v6.getVertexManager().getPlugin()
+ assertTrue(v6.getVertexManager().getPlugin()
instanceof ShuffleVertexManager);
}
@@ -3020,6 +3021,306 @@ public class TestVertexImpl {
}
}
+ @Test(timeout = 5000)
+ /**
+ * Ref: TEZ-1494
+ * If broadcast, one-to-one or custom edges are present in source, tasks should not start until
+ * 1 task from each source vertex is complete.
+ */
+ public void testTaskSchedulingWithCustomEdges() {
+ setupPreDagCreation();
+ dagPlan = createCustomDAGWithCustomEdges();
+ setupPostDagCreation();
+
+ /**
+ *
+ * M2 --(SG)--> R3 --(B)--\
+ * \
+ * M7 --(B)---------------->M5 ---(SG)--> R6
+ * /
+ * M8---(C)--------------->/
+ */
+
+ //init M2
+ VertexImpl m2 = vertices.get("M2");
+ VertexImpl m7 = vertices.get("M7");
+ VertexImpl r3 = vertices.get("R3");
+ VertexImpl m5 = vertices.get("M5");
+ VertexImpl m8 = vertices.get("M8");
+
+ initVertex(m2);
+ initVertex(m7);
+ initVertex(m8);
+ assertTrue(m7.getState().equals(VertexState.INITED));
+ assertTrue(m5.getState().equals(VertexState.INITED));
+ assertTrue(m8.getState().equals(VertexState.INITED));
+ assertTrue(m7.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager);
+
+ //Start M2; Let tasks complete in M2; Also let 1 task complete in R3
+ dispatcher.getEventHandler().handle(new VertexEvent(m2.getVertexId(), VertexEventType.V_START));
+ dispatcher.await();
+ VertexEventTaskAttemptCompleted taskAttemptCompleted = new VertexEventTaskAttemptCompleted
+ (TezTaskAttemptID.getInstance(TezTaskID.getInstance(m2.getVertexId(),0), 0), TaskAttemptStateInternal.SUCCEEDED);
+ VertexEventTaskCompleted taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(m2
+ .getVertexId(), 0), TaskState.SUCCEEDED);
+ dispatcher.getEventHandler().handle(taskAttemptCompleted);
+ dispatcher.getEventHandler().handle(taskCompleted);
+ dispatcher.await();
+ taskAttemptCompleted = new VertexEventTaskAttemptCompleted
+ (TezTaskAttemptID.getInstance(TezTaskID.getInstance(r3.getVertexId(),0), 0),
+ TaskAttemptStateInternal.SUCCEEDED);
+ taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(r3
+ .getVertexId(), 0), TaskState.SUCCEEDED);
+ dispatcher.getEventHandler().handle(taskAttemptCompleted);
+ dispatcher.getEventHandler().handle(taskCompleted);
+ dispatcher.await();
+ assertTrue(m2.getState().equals(VertexState.SUCCEEDED));
+ assertTrue(m5.numSuccessSourceAttemptCompletions == 1);
+ assertTrue(m5.getState().equals(VertexState.INITED));
+
+ //R3 should be in running state as it has one task completed, and rest are pending
+ assertTrue(r3.getState().equals(VertexState.RUNNING));
+
+ //Let us start M7; M5 should start not start as it is dependent on M8 as well
+ dispatcher.getEventHandler().handle(new VertexEvent(m7.getVertexId(),VertexEventType.V_START));
+ dispatcher.await();
+ //Let one of the tasks get over in M7 as well.
+ taskAttemptCompleted = new VertexEventTaskAttemptCompleted
+ (TezTaskAttemptID.getInstance(TezTaskID.getInstance(m7.getVertexId(),0), 0),
+ TaskAttemptStateInternal.SUCCEEDED);
+ taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(m7
+ .getVertexId(), 0), TaskState.SUCCEEDED);
+ dispatcher.getEventHandler().handle(taskAttemptCompleted);
+ dispatcher.getEventHandler().handle(taskCompleted);
+ dispatcher.await();
+ assertTrue(m5.numSuccessSourceAttemptCompletions == 2);
+
+ //M5 should be in INITED state, as it depends on M8
+ assertTrue(m5.getState().equals(VertexState.INITED));
+ for(Task task : m5.getTasks().values()) {
+ assertTrue(task.getState().equals(TaskState.NEW));
+ }
+
+ //Let us start M8; M5 should start now
+ dispatcher.getEventHandler().handle(new VertexEvent(m8.getVertexId(),VertexEventType.V_START));
+ dispatcher.await();
+
+ //M5 in running state. But tasks should not be scheduled until M8 finishes a task.
+ assertTrue(m5.getState().equals(VertexState.RUNNING));
+ for(Task task : m5.getTasks().values()) {
+ assertTrue(task.getState().equals(TaskState.NEW));
+ }
+
+ //Let one of the tasks get over in M8 as well. This should trigger tasks to be scheduled in M5
+ taskAttemptCompleted = new VertexEventTaskAttemptCompleted
+ (TezTaskAttemptID.getInstance(TezTaskID.getInstance(m8.getVertexId(),0), 0),
+ TaskAttemptStateInternal.SUCCEEDED);
+ taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(m8
+ .getVertexId(), 0), TaskState.SUCCEEDED);
+ dispatcher.getEventHandler().handle(taskAttemptCompleted);
+ dispatcher.getEventHandler().handle(taskCompleted);
+ dispatcher.await();
+
+ assertTrue(m5.numSuccessSourceAttemptCompletions == 3);
+ //Ensure all tasks in M5 are in scheduled state
+ for(Task task : m5.getTasks().values()) {
+ assertTrue(task.getState().equals(TaskState.SCHEDULED));
+ }
+ }
+
+ //For TEZ-1494
+ private DAGPlan createCustomDAGWithCustomEdges() {
+ /**
+ *
+ * M2 --(SG)--> R3 --(B)--\
+ * \
+ * M7 --(B)---------------->M5 ---(SG)--> R6
+ * /
+ * M8---(C)--------------->/
+ */
+ DAGPlan dag = DAGPlan.newBuilder().setName("TestSamplerDAG")
+ .addVertex(VertexPlan.newBuilder()
+ .setName("M2")
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("M2.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build())
+ .setTaskConfig(PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("M2.class")
+ .build()
+ )
+ .addOutEdgeId("M2_R3")
+ .build()
+ )
+ .addVertex(VertexPlan.newBuilder()
+ .setName("M8")
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("M8.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build())
+ .setTaskConfig(PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("M8.class")
+ .build()
+ )
+ .addOutEdgeId("M8_M5")
+ .build()
+ )
+ .addVertex(VertexPlan.newBuilder()
+ .setName("R3")
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("M2.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack1").build())
+ .setTaskConfig(PlanTaskConfiguration.newBuilder()
+ .setNumTasks(10)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("R3.class")
+ .build()
+ )
+ .addInEdgeId("M2_R3")
+ .addOutEdgeId("R3_M5")
+ .build()
+ )
+ .addVertex(VertexPlan.newBuilder()
+ .setName("M5")
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("M5.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build())
+ .setTaskConfig(PlanTaskConfiguration.newBuilder()
+ .setNumTasks(10)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("M5.class")
+ .build()
+ )
+ .addInEdgeId("R3_M5")
+ .addInEdgeId("M7_M5")
+ .addInEdgeId("M8_M5")
+ .addOutEdgeId("M5_R6")
+ .build()
+ )
+ .addVertex(VertexPlan.newBuilder()
+ .setName("M7")
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("M7.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack1").build())
+ .setTaskConfig(PlanTaskConfiguration.newBuilder()
+ .setNumTasks(10)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("M7.class")
+ .build()
+ )
+ .addOutEdgeId("M7_M5")
+ .build()
+ )
+ .addVertex(VertexPlan.newBuilder()
+ .setName("R6")
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("R6.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build())
+ .setTaskConfig(PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("R6.class")
+ .build()
+ )
+ .addInEdgeId("M5_R6")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M2_R3"))
+ .setInputVertexName("M2")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M2_R3.class"))
+ .setOutputVertexName("R3")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("M2_R3")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("R3_M5"))
+ .setInputVertexName("R3")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("R3_M5.class"))
+ .setOutputVertexName("M5")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("R3_M5")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M7_M5"))
+ .setInputVertexName("M7")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M7_M5.class"))
+ .setOutputVertexName("M5")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("M7_M5")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M5_R6"))
+ .setInputVertexName("M5")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M5_R6.class"))
+ .setOutputVertexName("R6")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("M5_R6")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5"))
+ .setInputVertexName("M8")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5.class"))
+ .setEdgeManager(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName(EdgeManagerForTest.class.getName())
+ .setUserPayload(ByteString.copyFrom(edgePayload))
+ .build())
+ .setOutputVertexName("M5")
+ .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+ .setId("M8_M5")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+
+ return dag;
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexWithInitializerSuccess() {
@@ -3215,6 +3516,7 @@ public class TestVertexImpl {
dispatcher.getEventHandler(), taskAttemptListener,
clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
updateTracker);
+ v.setInputVertices(new HashMap());
vertexIdMap.put(vId, v);
vertices.put(v.getName(), v);
v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
@@ -3454,8 +3756,8 @@ public class TestVertexImpl {
List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(0);
Assert.assertEquals(1, groupInSpec.size());
Assert.assertEquals("Group", groupInSpec.get(0).getGroupName());
- Assert.assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));
- Assert.assertTrue(groupInSpec.get(0).getGroupVertices().contains("B"));
+ assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));
+ assertTrue(groupInSpec.get(0).getGroupVertices().contains("B"));
groupInSpec.get(0).getMergedInputDescriptor().getClassName().equals("Group.class");
}