You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/06/03 04:49:30 UTC
git commit: TEZ-1164. Only events for tasks should be buffered in
Initializing state (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master cf073633e -> beca145bb
TEZ-1164. Only events for tasks should be buffered in Initializing state (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/beca145b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/beca145b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/beca145b
Branch: refs/heads/master
Commit: beca145bbe61bc60a0e923072f95d8423f955ddc
Parents: cf07363
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Jun 2 19:49:22 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Jun 2 19:49:22 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 52 +++++++++++---------
.../tez/dag/app/dag/impl/TestVertexImpl.java | 22 +++++++++
2 files changed, 50 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/beca145b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 21c3cc1..dcdbe31 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -319,7 +319,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
VertexEventType.V_ROUTE_EVENT,
- new RouteEventsWhileInitializingTransition())
+ ROUTE_EVENT_TRANSITION)
.addTransition(VertexState.INITIALIZING, VertexState.KILLED,
VertexEventType.V_TERMINATE,
new TerminateInitingVertexTransition())
@@ -2618,12 +2618,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return false;
}
- // Vertex will be moving to INITED state, safe to process pending route events.
- if (!pendingRouteEvents.isEmpty()) {
- VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
- new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
- pendingRouteEvents.clear();
- }
+// // Vertex will be moving to INITED state, safe to process pending route events.
+// if (!pendingRouteEvents.isEmpty()) {
+// VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
+// new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
+// pendingRouteEvents.clear();
+// }
return true;
}
@@ -3208,17 +3208,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
- private static class RouteEventsWhileInitializingTransition implements
- SingleArcTransition<VertexImpl, VertexEvent> {
-
- @Override
- public void transition(VertexImpl vertex, VertexEvent event) {
- VertexEventRouteEvent re = (VertexEventRouteEvent) event;
- // Store the events for post-init routing, since INIT state is when
- // initial task parallelism will be set
- vertex.pendingRouteEvents.addAll(re.getEvents());
- }
- }
+// private static class RouteEventsWhileInitializingTransition implements
+// SingleArcTransition<VertexImpl, VertexEvent> {
+//
+// @Override
+// public void transition(VertexImpl vertex, VertexEvent event) {
+// VertexEventRouteEvent re = (VertexEventRouteEvent) event;
+// // Store the events for post-init routing, since INIT state is when
+// // initial task parallelism will be set
+// vertex.pendingRouteEvents.addAll(re.getEvents());
+// }
+// }
private static class RouteEventTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
@@ -3301,12 +3301,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
- checkEventSourceMetadata(vertex, sourceMeta);
- RootInputDataInformationEvent riEvent = (RootInputDataInformationEvent) tezEvent
- .getEvent();
- TezTaskID targetTaskID = TezTaskID.getInstance(vertex.getVertexId(),
- riEvent.getTargetIndex());
- vertex.eventHandler.handle(new TaskEventAddTezEvent(targetTaskID, tezEvent));
+ if (vertex.tasksNotYetScheduled) {
+ vertex.pendingTaskEvents.add(tezEvent);
+ } else {
+ checkEventSourceMetadata(vertex, sourceMeta);
+ RootInputDataInformationEvent riEvent = (RootInputDataInformationEvent) tezEvent
+ .getEvent();
+ TezTaskID targetTaskID = TezTaskID.getInstance(vertex.getVertexId(),
+ riEvent.getTargetIndex());
+ vertex.eventHandler.handle(new TaskEventAddTezEvent(targetTaskID, tezEvent));
+ }
break;
case VERTEX_MANAGER_EVENT:
{
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/beca145b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 06b9b47..31be599 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -124,6 +124,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.EdgeManagerForTest;
import org.apache.tez.test.VertexManagerPluginForTest;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -2541,6 +2542,8 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.INITED, v1.getState());
Assert.assertEquals(5, v1.getTotalTasks());
+ // task events get buffered
+ Assert.assertEquals(5, v1.pendingTaskEvents.size());
Assert.assertEquals(RootInputVertexManager.class.getName(), v1
.getVertexManager().getPlugin().getClass().getName());
for (int i=0; i < v1Hints.size(); ++i) {
@@ -2550,12 +2553,31 @@ public class TestVertexImpl {
VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+
+ // non-task events dont get buffered
+ List<TezEvent> events = Lists.newLinkedList();
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ events.add(new TezEvent(
+ new VertexManagerEvent("vertex2", new byte[0]), new EventMetaData(
+ EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2",
+ ta0_t0_v1)));
+ events.add(new TezEvent(new RootInputDataInformationEvent(0, new byte[0]),
+ new EventMetaData(EventProducerConsumerType.INPUT, "vertex2",
+ "NULL_VERTEX", null)));
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v2.getVertexId(), events));
+ dispatcher.await();
+ Assert.assertEquals(1, v2.pendingTaskEvents.size());
+
RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
runner2.completeInputInitialization(10, v2Hints);
Assert.assertEquals(VertexState.INITED, v2.getState());
Assert.assertEquals(10, v2.getTotalTasks());
+ // task events get buffered
+ Assert.assertEquals(11, v2.pendingTaskEvents.size());
Assert.assertEquals(RootInputVertexManager.class.getName(), v2
.getVertexManager().getPlugin().getClass().getName());
for (int i=0; i < v2Hints.size(); ++i) {