You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/14 07:30:09 UTC

git commit: TEZ-551. Edge should route events to the destination vertex, instead of directly to destination tasks (bikas)

Updated Branches:
  refs/heads/master de57a45f3 -> 305be76dd


TEZ-551. Edge should route events to the destination vertex, instead of directly to destination tasks (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/305be76d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/305be76d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/305be76d

Branch: refs/heads/master
Commit: 305be76dd344b441f81a6a99f4783264a195e8a5
Parents: de57a45
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Oct 13 22:27:49 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Oct 13 22:27:49 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 47 +++++++-------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 67 +++++++++++++-------
 2 files changed, 67 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/305be76d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 9184c0f..3b20e1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.dag.EdgeManager;
-import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -168,41 +168,38 @@ public class Edge {
   public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
     if (!bufferEvents.get()) {
       List<Integer> destTaskIndices = new ArrayList<Integer>();
+      boolean isDataMovementEvent = true;
       switch (tezEvent.getEventType()) {
-      case DATA_MOVEMENT_EVENT:
-        DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
-        TezTaskAttemptID dmSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
-        int dmSourceTaskIndex = dmSourceAttemptId.getTaskID().getId();
-        edgeManager.routeEventToDestinationTasks(dmEvent, dmSourceTaskIndex,
-            destinationVertex.getTotalTasks(), destTaskIndices);
-        for(Integer destTaskIndex : destTaskIndices) {
-          EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT, 
-              destinationVertex.getName(), 
-              sourceVertex.getName(), 
-              null); // will be filled by Task when sending the event. Is it needed?
-          destMeta.setIndex(dmEvent.getTargetIndex());
-          tezEvent.setDestinationInfo(destMeta);
-          Task destTask = destinationVertex.getTask(destTaskIndex);
-          TezTaskID destTaskId = destTask.getTaskId();
-          sendEventToTask(destTaskId, tezEvent);
-        }        
-        break;
       case INPUT_FAILED_EVENT:
-        InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
-        TezTaskAttemptID ifSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
-        int ifSourceTaskIndex = ifSourceAttemptId.getTaskID().getId();
-        edgeManager.routeEventToDestinationTasks(ifEvent, ifSourceTaskIndex,
-            destinationVertex.getTotalTasks(), destTaskIndices);
+        isDataMovementEvent = false;
+      case DATA_MOVEMENT_EVENT:
+        Event event = tezEvent.getEvent();
+        TezTaskAttemptID sourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+        int sourceTaskIndex = sourceAttemptId.getTaskID().getId();
+        if (isDataMovementEvent) {
+          edgeManager.routeEventToDestinationTasks((DataMovementEvent) event,
+              sourceTaskIndex, destinationVertex.getTotalTasks(),
+              destTaskIndices);
+        } else {
+          edgeManager.routeEventToDestinationTasks((InputFailedEvent) event,
+              sourceTaskIndex, destinationVertex.getTotalTasks(),
+              destTaskIndices);
+        }
         for(Integer destTaskIndex : destTaskIndices) {
           EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT, 
               destinationVertex.getName(), 
               sourceVertex.getName(), 
               null); // will be filled by Task when sending the event. Is it needed?
-          destMeta.setIndex(ifEvent.getTargetIndex());
+          if (isDataMovementEvent) {
+            destMeta.setIndex(((DataMovementEvent)event).getTargetIndex());
+          } else {
+            destMeta.setIndex(((InputFailedEvent)event).getTargetIndex());
+          }
           tezEvent.setDestinationInfo(destMeta);
           TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
           sendEventToTask(destTaskId, tezEvent);
         }        
+        break;
       default:
         throw new TezUncheckedException("Unhandled tez event type: "
             + tezEvent.getEventType());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/305be76d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8433477..80f0a97 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1616,11 +1616,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     diagnostics.add(diag);
   }
   
-  private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) {
+  private static boolean isEventFromVertex(Vertex vertex, 
+      EventMetaData sourceMeta) {
     if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) {
-      throw new TezUncheckedException("Bad routing of event"
-          + ", Event-vertex=" + sourceMeta.getTaskVertexName()
-          + ", Expected=" + vertex.getName());
+      return false;
+    }
+    return true;
+  }
+
+  private static void checkEventSourceMetadata(Vertex vertex, 
+      EventMetaData sourceMeta) {
+    if (!isEventFromVertex(vertex, sourceMeta)) {
+        throw new TezUncheckedException("Bad routing of event"
+            + ", Event-vertex=" + sourceMeta.getTaskVertexName()
+            + ", Expected=" + vertex.getName());
     }
   }
 
@@ -1651,16 +1660,41 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               + tezEvent.getEventType());
         }
         EventMetaData sourceMeta = tezEvent.getSourceInfo();
+        boolean isDataMovementEvent = true;
         switch(tezEvent.getEventType()) {
+        case INPUT_FAILED_EVENT:
+          isDataMovementEvent = false;
         case DATA_MOVEMENT_EVENT:
           {
-            checkEventSourceMetadata(vertex, sourceMeta);
-            TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
-            DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
-            dmEvent.setVersion(srcTaId.getId());
-            Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
-                sourceMeta.getEdgeVertexName()));
-            destEdge.sendTezEventToDestinationTasks(tezEvent);
+            if (isEventFromVertex(vertex, sourceMeta)) {
+              // event from this vertex. send to destination vertex
+              TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+              if (isDataMovementEvent) {
+                ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+              } else {
+                ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+              }
+              Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
+              Edge destEdge = vertex.targetVertices.get(destVertex);
+              if (destEdge == null) {
+                throw new TezUncheckedException("Bad destination vertex: " + 
+                    sourceMeta.getEdgeVertexName() + " for event vertex: " +
+                    vertex.getVertexId());
+              }
+              vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex
+                  .getVertexId(), Collections.singletonList(tezEvent)));
+            } else {
+              // event not from this vertex. must have come from source vertex.
+              // send to tasks
+              Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+                  sourceMeta.getTaskVertexName()));
+              if (srcEdge == null) {
+                throw new TezUncheckedException("Bad source vertex: " + 
+                    sourceMeta.getTaskVertexName() + " for destination vertex: " +
+                    vertex.getVertexId());
+              }
+              srcEdge.sendTezEventToDestinationTasks(tezEvent);
+            }
           }
           break;
         case VERTEX_MANAGER_EVENT:
@@ -1675,17 +1709,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
         }
           break;
-        case INPUT_FAILED_EVENT:
-        {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
-          InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
-          ifEvent.setVersion(srcTaId.getId());
-          Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
-              sourceMeta.getEdgeVertexName()));
-          destEdge.sendTezEventToDestinationTasks(tezEvent);
-        }
-        break;
         case INPUT_READ_ERROR_EVENT:
           {
             checkEventSourceMetadata(vertex, sourceMeta);