You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/07 07:03:21 UTC
tez git commit: TEZ-2404. Handle DataMovementEvent before its
TaskAttemptCompletedEvent (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master d5a0f39d9 -> 02870f0ac
TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/02870f0a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/02870f0a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/02870f0a
Branch: refs/heads/master
Commit: 02870f0ac1095d67a85a864860b0c4ce68a1db57
Parents: d5a0f39
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 7 13:03:07 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 7 13:03:07 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/app/TaskAttemptListenerImpTezDag.java | 35 +++++---------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 8 +++++
.../app/TestTaskAttemptListenerImplTezDag.java | 21 +++++++-----
4 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 58648e4..7feefcc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
Default max limit increased. Should not affect existing users.
ALL CHANGES:
+ TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent
TEZ-2424. Bump up max counter group name length limit to account for per_io counters.
TEZ-2417. Tez UI: Counters are blank in the Attempts page if all attempts failed
TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333
http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d96da83..b38081b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -423,12 +423,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+ // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
+ // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
+ // to VertexImpl to ensure the events ordering
+ // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
+ // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
final EventType eventType = tezEvent.getEventType();
- if (eventType == EventType.TASK_STATUS_UPDATE_EVENT ||
- eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) {
- context.getEventHandler()
- .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent));
+ if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+ TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+ (TaskStatusUpdateEvent) tezEvent.getEvent());
+ context.getEventHandler().handle(taskAttemptEvent);
} else {
otherEvents.add(tezEvent);
}
@@ -453,28 +458,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
}
- private TaskAttemptEvent getTaskAttemptEventFromTezEvent(TezTaskAttemptID taskAttemptID,
- TezEvent tezEvent) {
- final EventType eventType = tezEvent.getEventType();
- TaskAttemptEvent taskAttemptEvent;
- switch (eventType) {
- case TASK_STATUS_UPDATE_EVENT:
- {
- taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
- (TaskStatusUpdateEvent) tezEvent.getEvent());
- }
- break;
- case TASK_ATTEMPT_COMPLETED_EVENT:
- {
- taskAttemptEvent = new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE);
- }
- break;
- default:
- throw new TezUncheckedException("unknown event type " + eventType);
- }
- return taskAttemptEvent;
- }
-
private Map<String, TezLocalResource> convertLocalResourceMap(Map<String, LocalResource> ylrs)
throws IOException {
Map<String, TezLocalResource> tlrs = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9ed7441..5d61642 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -132,6 +132,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -4131,6 +4132,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
);
}
break;
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ {
+ checkEventSourceMetadata(vertex, sourceMeta);
+ vertex.getEventHandler().handle(
+ new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
+ }
+ break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ tezEvent.getEventType());
http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index ec4f99a..f974f40 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -53,6 +54,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -179,8 +181,9 @@ public class TestTaskAttemptListenerImplTezDag {
@Test (timeout = 5000)
public void testTaskEventRouting() throws Exception {
List<TezEvent> events = Arrays.asList(
- new TezEvent(InputInitializerEvent.create("test_vertex", "test_input", null), null),
- new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null)
+ new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+ new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
+ new TezEvent(new TaskAttemptCompletedEvent(), null)
);
EventHandler eventHandler = generateHeartbeat(events);
@@ -193,13 +196,15 @@ public class TestTaskAttemptListenerImplTezDag {
assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
statusUpdateEvent.getType());
-
final Event vertexEvent = argAllValues.get(1);
final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
- assertEquals("Other events should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+ assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
vertexEvent.getType());
- assertEquals(EventType.ROOT_INPUT_INITIALIZER_EVENT,
+ assertEquals(EventType.DATA_MOVEMENT_EVENT,
vertexRouteEvent.getEvents().get(0).getEventType());
+ assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+ vertexRouteEvent.getEvents().get(1).getEventType());
+
}
@Test (timeout = 5000)
@@ -213,9 +218,9 @@ public class TestTaskAttemptListenerImplTezDag {
verify(eventHandler, times(1)).handle(arg.capture());
final List<Event> argAllValues = arg.getAllValues();
- final Event statusUpdateEvent = argAllValues.get(0);
- assertEquals("only event should be task done", TaskAttemptEventType.TA_DONE,
- statusUpdateEvent.getType());
+ final Event event = argAllValues.get(0);
+ assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
+ event.getType());
}
private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException, TezException {