You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:47 UTC

[09/43] tez git commit: TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent (zjffdu)

TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/02870f0a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/02870f0a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/02870f0a

Branch: refs/heads/TEZ-2003
Commit: 02870f0ac1095d67a85a864860b0c4ce68a1db57
Parents: d5a0f39
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 7 13:03:07 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 7 13:03:07 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 35 +++++---------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  8 +++++
 .../app/TestTaskAttemptListenerImplTezDag.java  | 21 +++++++-----
 4 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 58648e4..7feefcc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
     Default max limit increased. Should not affect existing users.
 
 ALL CHANGES:
+  TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent
   TEZ-2424. Bump up max counter group name length limit to account for per_io counters.
   TEZ-2417. Tez UI: Counters are blank in the Attempts page if all attempts failed
   TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333

http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d96da83..b38081b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -423,12 +423,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         }
 
         List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+        // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
+        // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
+        // to VertexImpl to ensure the events ordering
+        //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
+        //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
         for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
           final EventType eventType = tezEvent.getEventType();
-          if (eventType == EventType.TASK_STATUS_UPDATE_EVENT ||
-              eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) {
-            context.getEventHandler()
-                .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent));
+          if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+            TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+                    (TaskStatusUpdateEvent) tezEvent.getEvent());
+            context.getEventHandler().handle(taskAttemptEvent);
           } else {
             otherEvents.add(tezEvent);
           }
@@ -453,28 +458,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
   }
 
-  private TaskAttemptEvent getTaskAttemptEventFromTezEvent(TezTaskAttemptID taskAttemptID,
-                                                           TezEvent tezEvent) {
-    final EventType eventType = tezEvent.getEventType();
-    TaskAttemptEvent taskAttemptEvent;
-    switch (eventType) {
-      case TASK_STATUS_UPDATE_EVENT:
-        {
-          taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
-              (TaskStatusUpdateEvent) tezEvent.getEvent());
-        }
-        break;
-      case TASK_ATTEMPT_COMPLETED_EVENT:
-        {
-          taskAttemptEvent = new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE);
-        }
-        break;
-      default:
-        throw new TezUncheckedException("unknown event type " + eventType);
-    }
-    return taskAttemptEvent;
-  }
-
   private Map<String, TezLocalResource> convertLocalResourceMap(Map<String, LocalResource> ylrs)
       throws IOException {
     Map<String, TezLocalResource> tlrs = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9ed7441..5d61642 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -132,6 +132,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -4131,6 +4132,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
               );
         }
         break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        {
+          checkEventSourceMetadata(vertex, sourceMeta);
+          vertex.getEventHandler().handle(
+              new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
+        }
+        break;
       default:
         throw new TezUncheckedException("Unhandled tez event type: "
             + tezEvent.getEventType());

http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index ec4f99a..f974f40 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -53,6 +54,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -179,8 +181,9 @@ public class TestTaskAttemptListenerImplTezDag {
   @Test (timeout = 5000)
   public void testTaskEventRouting() throws Exception {
     List<TezEvent> events =  Arrays.asList(
-      new TezEvent(InputInitializerEvent.create("test_vertex", "test_input", null), null),
-      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null)
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+      new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
+      new TezEvent(new TaskAttemptCompletedEvent(), null)
     );
 
     EventHandler eventHandler = generateHeartbeat(events);
@@ -193,13 +196,15 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
         statusUpdateEvent.getType());
 
-
     final Event vertexEvent = argAllValues.get(1);
     final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
-    assertEquals("Other events should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+    assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
         vertexEvent.getType());
-    assertEquals(EventType.ROOT_INPUT_INITIALIZER_EVENT,
+    assertEquals(EventType.DATA_MOVEMENT_EVENT,
         vertexRouteEvent.getEvents().get(0).getEventType());
+    assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+        vertexRouteEvent.getEvents().get(1).getEventType());
+
   }
 
   @Test (timeout = 5000)
@@ -213,9 +218,9 @@ public class TestTaskAttemptListenerImplTezDag {
     verify(eventHandler, times(1)).handle(arg.capture());
     final List<Event> argAllValues = arg.getAllValues();
 
-    final Event statusUpdateEvent = argAllValues.get(0);
-    assertEquals("only event should be task done", TaskAttemptEventType.TA_DONE,
-        statusUpdateEvent.getType());
+    final Event event = argAllValues.get(0);
+    assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
+        event.getType());
   }
 
   private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException, TezException {