You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/06/03 04:49:30 UTC

git commit: TEZ-1164. Only events for tasks should be buffered in Initializing state (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master cf073633e -> beca145bb


TEZ-1164. Only events for tasks should be buffered in Initializing state (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/beca145b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/beca145b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/beca145b

Branch: refs/heads/master
Commit: beca145bbe61bc60a0e923072f95d8423f955ddc
Parents: cf07363
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Jun 2 19:49:22 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Jun 2 19:49:22 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 52 +++++++++++---------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 22 +++++++++
 2 files changed, 50 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/beca145b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 21c3cc1..dcdbe31 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -319,7 +319,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
               VertexEventType.V_ROUTE_EVENT,
-              new RouteEventsWhileInitializingTransition())
+              ROUTE_EVENT_TRANSITION)
           .addTransition(VertexState.INITIALIZING, VertexState.KILLED,
               VertexEventType.V_TERMINATE,
               new TerminateInitingVertexTransition())
@@ -2618,12 +2618,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       return false;
     }
 
-    // Vertex will be moving to INITED state, safe to process pending route events.
-    if (!pendingRouteEvents.isEmpty()) {
-      VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
-          new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
-      pendingRouteEvents.clear();
-    }
+//    // Vertex will be moving to INITED state, safe to process pending route events.
+//    if (!pendingRouteEvents.isEmpty()) {
+//      VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
+//          new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
+//      pendingRouteEvents.clear();
+//    }
     return true;
   }
 
@@ -3208,17 +3208,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  private static class RouteEventsWhileInitializingTransition implements
-      SingleArcTransition<VertexImpl, VertexEvent> {
-
-    @Override
-    public void transition(VertexImpl vertex, VertexEvent event) {
-      VertexEventRouteEvent re = (VertexEventRouteEvent) event;
-      // Store the events for post-init routing, since INIT state is when
-      // initial task parallelism will be set
-      vertex.pendingRouteEvents.addAll(re.getEvents());
-    }
-  }
+//  private static class RouteEventsWhileInitializingTransition implements
+//      SingleArcTransition<VertexImpl, VertexEvent> {
+//
+//    @Override
+//    public void transition(VertexImpl vertex, VertexEvent event) {
+//      VertexEventRouteEvent re = (VertexEventRouteEvent) event;
+//      // Store the events for post-init routing, since INIT state is when
+//      // initial task parallelism will be set
+//      vertex.pendingRouteEvents.addAll(re.getEvents());
+//    }
+//  }
 
   private static class RouteEventTransition  implements
   SingleArcTransition<VertexImpl, VertexEvent> {
@@ -3301,12 +3301,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           break;
         case ROOT_INPUT_DATA_INFORMATION_EVENT:
-          checkEventSourceMetadata(vertex, sourceMeta);
-          RootInputDataInformationEvent riEvent = (RootInputDataInformationEvent) tezEvent
-              .getEvent();
-          TezTaskID targetTaskID = TezTaskID.getInstance(vertex.getVertexId(),
-              riEvent.getTargetIndex());
-          vertex.eventHandler.handle(new TaskEventAddTezEvent(targetTaskID, tezEvent));          
+          if (vertex.tasksNotYetScheduled) {
+            vertex.pendingTaskEvents.add(tezEvent);
+          } else {
+            checkEventSourceMetadata(vertex, sourceMeta);
+            RootInputDataInformationEvent riEvent = (RootInputDataInformationEvent) tezEvent
+                .getEvent();
+            TezTaskID targetTaskID = TezTaskID.getInstance(vertex.getVertexId(),
+                riEvent.getTargetIndex());
+            vertex.eventHandler.handle(new TaskEventAddTezEvent(targetTaskID, tezEvent));
+          }
           break;
         case VERTEX_MANAGER_EVENT:
         {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/beca145b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 06b9b47..31be599 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -124,6 +124,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.EdgeManagerForTest;
 import org.apache.tez.test.VertexManagerPluginForTest;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -2541,6 +2542,8 @@ public class TestVertexImpl {
 
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(5, v1.getTotalTasks());
+    // task events get buffered
+    Assert.assertEquals(5, v1.pendingTaskEvents.size());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v1
         .getVertexManager().getPlugin().getClass().getName());
     for (int i=0; i < v1Hints.size(); ++i) {
@@ -2550,12 +2553,31 @@ public class TestVertexImpl {
     
     VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    
+    // non-task events dont get buffered
+    List<TezEvent> events = Lists.newLinkedList();
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+    events.add(new TezEvent(
+        new VertexManagerEvent("vertex2", new byte[0]), new EventMetaData(
+            EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2",
+            ta0_t0_v1)));
+    events.add(new TezEvent(new RootInputDataInformationEvent(0, new byte[0]),
+        new EventMetaData(EventProducerConsumerType.INPUT, "vertex2",
+            "NULL_VERTEX", null)));
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v2.getVertexId(), events));
+    dispatcher.await();
+    Assert.assertEquals(1, v2.pendingTaskEvents.size());
+    
     RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
     List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
     runner2.completeInputInitialization(10, v2Hints);
     
     Assert.assertEquals(VertexState.INITED, v2.getState());
     Assert.assertEquals(10, v2.getTotalTasks());
+    // task events get buffered
+    Assert.assertEquals(11, v2.pendingTaskEvents.size());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v2
         .getVertexManager().getPlugin().getClass().getName());
     for (int i=0; i < v2Hints.size(); ++i) {