You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2014/09/19 08:13:05 UTC
git commit: TEZ-1597. ImmediateStartVertexManager should handle
corner case of vertex having zero tasks (Rajesh Balamohan)
Repository: tez
Updated Branches:
refs/heads/master 625450cf1 -> 82ec16baf
TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks (Rajesh Balamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/82ec16ba
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/82ec16ba
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/82ec16ba
Branch: refs/heads/master
Commit: 82ec16baf890a6914337544ff3f6a7715aa1b86e
Parents: 625450c
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Sep 19 11:42:45 2014 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Sep 19 11:42:45 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/impl/ImmediateStartVertexManager.java | 9 +++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 45 ++++++++++++++++++++
3 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/82ec16ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5e2c2cd..3ae8f59 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,7 @@ ALL CHANGES
TEZ-1585. Memory leak in tez session mode.
TEZ-1533. Request Events more often if a complete set of events is received by a task.
TEZ-1587. Some tez-examples fail in local mode.
+ TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks.
Release 0.5.0: 2014-09-03
http://git-wip-us.apache.org/repos/asf/tez/blob/82ec16ba/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index ac2b851..773426b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -64,7 +64,14 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
String srcVertex = entry.getKey();
EdgeProperty edgeProp = entry.getValue();
- srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+ LOG.info("Task count in " + srcVertex + ": " + getContext().getVertexNumTasks(srcVertex));
+ //track vertices with task count > 0
+ if (getContext().getVertexNumTasks(srcVertex) > 0) {
+ srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+ } else {
+ LOG.info("Vertex: " + getContext().getVertexName() + "; Ignoring " + srcVertex
+ + " as it has got 0 tasks");
+ }
}
//handle completions
http://git-wip-us.apache.org/repos/asf/tez/blob/82ec16ba/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c003e05..5c00fec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -3642,6 +3642,8 @@ public class TestVertexImpl {
* M7 --(B)---------------->M5 ---(SG)--> R6
* /
* M8---(C)--------------->/
+ * /
+ * M9---(B)--------------> (zero task vertex)
*/
//init M2
@@ -3650,15 +3652,23 @@ public class TestVertexImpl {
VertexImpl r3 = vertices.get("R3");
VertexImpl m5 = vertices.get("M5");
VertexImpl m8 = vertices.get("M8");
+ VertexImpl m9 = vertices.get("M9");
initVertex(m2);
initVertex(m7);
initVertex(m8);
+ initVertex(m9);
assertTrue(m7.getState().equals(VertexState.INITED));
+ assertTrue(m9.getState().equals(VertexState.INITED));
assertTrue(m5.getState().equals(VertexState.INITED));
assertTrue(m8.getState().equals(VertexState.INITED));
assertTrue(m7.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager);
+ //Start M9
+ dispatcher.getEventHandler().handle(new VertexEvent(m9.getVertexId(),
+ VertexEventType.V_START));
+ dispatcher.await();
+
//Start M2; Let tasks complete in M2; Also let 1 task complete in R3
dispatcher.getEventHandler().handle(new VertexEvent(m2.getVertexId(), VertexEventType.V_START));
dispatcher.await();
@@ -3708,6 +3718,8 @@ public class TestVertexImpl {
dispatcher.getEventHandler().handle(new VertexEvent(m8.getVertexId(),VertexEventType.V_START));
dispatcher.await();
+ assertTrue(m9.getState().equals(VertexState.SUCCEEDED));
+
//M5 in running state. But tasks should not be scheduled until M8 finishes a task.
assertTrue(m5.getState().equals(VertexState.RUNNING));
for(Task task : m5.getTasks().values()) {
@@ -3740,6 +3752,8 @@ public class TestVertexImpl {
* M7 --(B)---------------->M5 ---(SG)--> R6
* /
* M8---(C)--------------->/
+ * /
+ * M9---(B)--------------> (zero task vertex)
*/
DAGPlan dag = DAGPlan.newBuilder().setName("TestSamplerDAG")
.addVertex(VertexPlan.newBuilder()
@@ -3778,6 +3792,24 @@ public class TestVertexImpl {
.addOutEdgeId("M8_M5")
.build()
)
+ .addVertex(VertexPlan.newBuilder()
+ .setName("M9")
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("M9.class"))
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build())
+ .setTaskConfig(PlanTaskConfiguration.newBuilder()
+ .setNumTasks(0) //Zero task vertex
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("M9.class")
+ .build()
+ )
+ .addOutEdgeId("M9_M5")
+ .build()
+ )
.addVertex(VertexPlan.newBuilder()
.setName("R3")
.setProcessorDescriptor(
@@ -3815,6 +3847,7 @@ public class TestVertexImpl {
.addInEdgeId("R3_M5")
.addInEdgeId("M7_M5")
.addInEdgeId("M8_M5")
+ .addInEdgeId("M9_M5")
.addOutEdgeId("M5_R6")
.build()
)
@@ -3904,6 +3937,18 @@ public class TestVertexImpl {
)
.addEdge(
EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M9_M5"))
+ .setInputVertexName("M9")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M9_M5.class"))
+ .setOutputVertexName("M5")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("M9_M5")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5"))
.setInputVertexName("M8")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5.class"))