You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/14 07:30:09 UTC
git commit: TEZ-551. Edge should route events to the destination
vertex, instead of directly to destination tasks (bikas)
Updated Branches:
refs/heads/master de57a45f3 -> 305be76dd
TEZ-551. Edge should route events to the destination vertex, instead of directly to destination tasks (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/305be76d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/305be76d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/305be76d
Branch: refs/heads/master
Commit: 305be76dd344b441f81a6a99f4783264a195e8a5
Parents: de57a45
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Oct 13 22:27:49 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Oct 13 22:27:49 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/app/dag/impl/Edge.java | 47 +++++++-------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 67 +++++++++++++-------
2 files changed, 67 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/305be76d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 9184c0f..3b20e1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.EdgeManager;
-import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -168,41 +168,38 @@ public class Edge {
public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
if (!bufferEvents.get()) {
List<Integer> destTaskIndices = new ArrayList<Integer>();
+ boolean isDataMovementEvent = true;
switch (tezEvent.getEventType()) {
- case DATA_MOVEMENT_EVENT:
- DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
- TezTaskAttemptID dmSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
- int dmSourceTaskIndex = dmSourceAttemptId.getTaskID().getId();
- edgeManager.routeEventToDestinationTasks(dmEvent, dmSourceTaskIndex,
- destinationVertex.getTotalTasks(), destTaskIndices);
- for(Integer destTaskIndex : destTaskIndices) {
- EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT,
- destinationVertex.getName(),
- sourceVertex.getName(),
- null); // will be filled by Task when sending the event. Is it needed?
- destMeta.setIndex(dmEvent.getTargetIndex());
- tezEvent.setDestinationInfo(destMeta);
- Task destTask = destinationVertex.getTask(destTaskIndex);
- TezTaskID destTaskId = destTask.getTaskId();
- sendEventToTask(destTaskId, tezEvent);
- }
- break;
case INPUT_FAILED_EVENT:
- InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
- TezTaskAttemptID ifSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
- int ifSourceTaskIndex = ifSourceAttemptId.getTaskID().getId();
- edgeManager.routeEventToDestinationTasks(ifEvent, ifSourceTaskIndex,
- destinationVertex.getTotalTasks(), destTaskIndices);
+ isDataMovementEvent = false;
+ case DATA_MOVEMENT_EVENT:
+ Event event = tezEvent.getEvent();
+ TezTaskAttemptID sourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+ int sourceTaskIndex = sourceAttemptId.getTaskID().getId();
+ if (isDataMovementEvent) {
+ edgeManager.routeEventToDestinationTasks((DataMovementEvent) event,
+ sourceTaskIndex, destinationVertex.getTotalTasks(),
+ destTaskIndices);
+ } else {
+ edgeManager.routeEventToDestinationTasks((InputFailedEvent) event,
+ sourceTaskIndex, destinationVertex.getTotalTasks(),
+ destTaskIndices);
+ }
for(Integer destTaskIndex : destTaskIndices) {
EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT,
destinationVertex.getName(),
sourceVertex.getName(),
null); // will be filled by Task when sending the event. Is it needed?
- destMeta.setIndex(ifEvent.getTargetIndex());
+ if (isDataMovementEvent) {
+ destMeta.setIndex(((DataMovementEvent)event).getTargetIndex());
+ } else {
+ destMeta.setIndex(((InputFailedEvent)event).getTargetIndex());
+ }
tezEvent.setDestinationInfo(destMeta);
TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
sendEventToTask(destTaskId, tezEvent);
}
+ break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ tezEvent.getEventType());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/305be76d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8433477..80f0a97 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1616,11 +1616,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
diagnostics.add(diag);
}
- private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) {
+ private static boolean isEventFromVertex(Vertex vertex,
+ EventMetaData sourceMeta) {
if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) {
- throw new TezUncheckedException("Bad routing of event"
- + ", Event-vertex=" + sourceMeta.getTaskVertexName()
- + ", Expected=" + vertex.getName());
+ return false;
+ }
+ return true;
+ }
+
+ private static void checkEventSourceMetadata(Vertex vertex,
+ EventMetaData sourceMeta) {
+ if (!isEventFromVertex(vertex, sourceMeta)) {
+ throw new TezUncheckedException("Bad routing of event"
+ + ", Event-vertex=" + sourceMeta.getTaskVertexName()
+ + ", Expected=" + vertex.getName());
}
}
@@ -1651,16 +1660,41 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ tezEvent.getEventType());
}
EventMetaData sourceMeta = tezEvent.getSourceInfo();
+ boolean isDataMovementEvent = true;
switch(tezEvent.getEventType()) {
+ case INPUT_FAILED_EVENT:
+ isDataMovementEvent = false;
case DATA_MOVEMENT_EVENT:
{
- checkEventSourceMetadata(vertex, sourceMeta);
- TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
- DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
- dmEvent.setVersion(srcTaId.getId());
- Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
- sourceMeta.getEdgeVertexName()));
- destEdge.sendTezEventToDestinationTasks(tezEvent);
+ if (isEventFromVertex(vertex, sourceMeta)) {
+ // event from this vertex. send to destination vertex
+ TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+ if (isDataMovementEvent) {
+ ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ } else {
+ ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ }
+ Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
+ Edge destEdge = vertex.targetVertices.get(destVertex);
+ if (destEdge == null) {
+ throw new TezUncheckedException("Bad destination vertex: " +
+ sourceMeta.getEdgeVertexName() + " for event vertex: " +
+ vertex.getVertexId());
+ }
+ vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex
+ .getVertexId(), Collections.singletonList(tezEvent)));
+ } else {
+ // event not from this vertex. must have come from source vertex.
+ // send to tasks
+ Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+ sourceMeta.getTaskVertexName()));
+ if (srcEdge == null) {
+ throw new TezUncheckedException("Bad source vertex: " +
+ sourceMeta.getTaskVertexName() + " for destination vertex: " +
+ vertex.getVertexId());
+ }
+ srcEdge.sendTezEventToDestinationTasks(tezEvent);
+ }
}
break;
case VERTEX_MANAGER_EVENT:
@@ -1675,17 +1709,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
break;
- case INPUT_FAILED_EVENT:
- {
- checkEventSourceMetadata(vertex, sourceMeta);
- TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
- InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
- ifEvent.setVersion(srcTaId.getId());
- Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
- sourceMeta.getEdgeVertexName()));
- destEdge.sendTezEventToDestinationTasks(tezEvent);
- }
- break;
case INPUT_READ_ERROR_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);