You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/14 04:41:49 UTC
tez git commit: TEZ-2599. Dont send obsoleted data movement events to
tasks (bikas) (cherry picked from commit
aca83090ed2580d093a1fec59217d78f2e575a6c)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 282f4e13e -> f5c87c1ef
TEZ-2599. Dont send obsoleted data movement events to tasks (bikas)
(cherry picked from commit aca83090ed2580d093a1fec59217d78f2e575a6c)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f5c87c1e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f5c87c1e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f5c87c1e
Branch: refs/heads/branch-0.7
Commit: f5c87c1ef0da3a92190e41eabad701bcb2fd551c
Parents: 282f4e1
Author: Bikas Saha <bi...@apache.org>
Authored: Sat Jul 4 01:17:24 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Dec 13 19:39:42 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 51 +++++--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 134 +++++++++++++++++++
.../java/org/apache/tez/test/TestInput.java | 4 +-
4 files changed, 181 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f6f028c..50d0b67 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,6 @@
Apache Tez Change Log
=====================
-Release 0.7.1: Unreleased
-
INCOMPATIBLE CHANGES
TEZ-2679. Admin forms of launch env settings
TEZ-2949. Allow duplicate dag names within session for Tez.
@@ -11,6 +9,7 @@ ALL CHANGES
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source.
TEZ-2995. Timeline primary filter should only be on callerId and not type.
+ TEZ-2599. Dont send obsoleted data movement events to tasks
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
reduce and slow start
TEZ-2956. Handle auto-reduce parallelism when the
http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 2f2b0c2..cf24348 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -799,6 +799,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
final TezEvent tezEvent;
final Edge eventEdge;
final int eventTaskIndex;
+ boolean isObsolete = false;
EventInfo(TezEvent tezEvent, Edge eventEdge, int eventTaskIndex) {
this.tezEvent = tezEvent;
this.eventEdge = eventEdge;
@@ -4224,12 +4225,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID
+ " vertex: " + getLogIdentifier());
boolean isFirstEvent = true;
+ boolean firstEventObsoleted = false;
for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId) {
boolean earlyExit = false;
if (events.size() == maxEvents) {
break;
}
EventInfo eventInfo = onDemandRouteEvents.get(nextFromEventId);
+ if (eventInfo.isObsolete) {
+ // ignore obsolete events
+ firstEventObsoleted = true;
+ continue;
+ }
TezEvent tezEvent = eventInfo.tezEvent;
switch(tezEvent.getEventType()) {
case INPUT_FAILED_EVENT:
@@ -4240,11 +4247,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
Edge srcEdge = eventInfo.eventEdge;
PendingEventRouteMetadata pendingRoute = null;
if (isFirstEvent) {
- // do this precondition check only for the first event
+ // the first event is the one that can have pending routes because its expanded
+ // events had not been completely sent in the last round.
isFirstEvent = false;
pendingRoute = srcEdge.removePendingEvents(attemptID);
if (pendingRoute != null) {
- Preconditions.checkState(tezEvent == pendingRoute.getTezEvent()); // same object
+ // the first event must match the pending route event
+ // the only reason it may not match is if in between rounds that event got
+ // obsoleted
+ if(tezEvent != pendingRoute.getTezEvent()) {
+ Preconditions.checkState(firstEventObsoleted);
+ // pending routes can be ignored for obsoleted events
+ pendingRoute = null;
+ }
}
}
if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex,
@@ -4384,12 +4399,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
getLogIdentifier());
}
if (srcEdge.hasOnDemandRouting()) {
- onDemandRouteEventsWriteLock.lock();
- try {
- onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
- } finally {
- onDemandRouteEventsWriteLock.unlock();
- }
+ processOnDemandEvent(tezEvent, srcEdge, srcTaskIndex);
} else {
// send to tasks
srcEdge.sendTezEventToDestinationTasks(tezEvent);
@@ -4525,6 +4535,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
}
+
+ private void processOnDemandEvent(TezEvent tezEvent, Edge srcEdge, int srcTaskIndex) {
+ onDemandRouteEventsWriteLock.lock();
+ try {
+ onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
+ if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
+ for (EventInfo eventInfo : onDemandRouteEvents) {
+ if (eventInfo.eventEdge == srcEdge
+ && eventInfo.tezEvent.getSourceInfo().getTaskAttemptID().equals(
+ tezEvent.getSourceInfo().getTaskAttemptID())
+ && (eventInfo.tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT
+ || eventInfo.tezEvent
+ .getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+ // any earlier data movement events from the same source
+ // edge+task
+ // can be obsoleted by an input failed event from the
+ // same source edge+task
+ eventInfo.isObsolete = true;
+ }
+ }
+ }
+ } finally {
+ onDemandRouteEventsWriteLock.unlock();
+ }
+ }
private static class InternalErrorTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index d08d011..fe540e8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -183,6 +183,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
@@ -191,6 +192,7 @@ import org.apache.tez.test.EdgeManagerForTest;
import org.apache.tez.test.VertexManagerPluginForTest;
import org.apache.tez.test.VertexManagerPluginForTest.VertexManagerPluginForTestConfig;
import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
@@ -2787,6 +2789,13 @@ public class TestVertexImpl {
Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
+ // ask for events with sufficient buffer. get all events in a single shot.
+ fromEventId = 0;
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
+ fromEventId = eventInfo.getNextFromEventId();
+ Assert.assertEquals(11, fromEventId);
+ Assert.assertEquals(11, eventInfo.getEvents().size());
+
// change max events to larger value. max events does not evenly divide total events
fromEventId = 0;
for (int i=1; i<=2; ++i) {
@@ -2816,6 +2825,131 @@ public class TestVertexImpl {
Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
Assert.assertEquals(2, eventInfo.getEvents().size()); // remainder events
}
+
+ @Test (timeout = 5000)
+ public void testVertexGetTAAttemptsObsoletion() throws Exception {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v1 = vertices.get("vertex1");
+ startVertex(v1);
+ VertexImpl v2 = vertices.get("vertex2");
+ startVertex(v2);
+ VertexImpl v3 = vertices.get("vertex3");
+ VertexImpl v4 = vertices.get("vertex4");
+
+ List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
+ // scheduling start to trigger edge routing to begin
+ for (int i=0; i<v4.getTotalTasks(); ++i) {
+ taskList.add(ScheduleTaskRequest.create(i, null));
+ }
+ v4.scheduleTasks(taskList);
+ Assert.assertEquals(VertexState.RUNNING, v4.getState());
+ Assert.assertEquals(1, v4.sourceVertices.size());
+ Edge e = v4.sourceVertices.get(v3);
+ TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+ TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+
+ for (int i=0; i<11; ++i) {
+ v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+ new TezEvent(DataMovementEvent.create(0, null),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+ }
+ dispatcher.await();
+ // verify all events have been are in taskEvents
+ Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+
+ TaskAttemptEventInfo eventInfo;
+ EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+ EventRouteMetadata mockRoute = EventRouteMetadata.create(1, new int[]{0});
+ e.edgeManager = mockPlugin;
+ when(
+ mockPlugin.routeInputSourceTaskFailedEventToDestination(anyInt(),
+ anyInt())).thenReturn(mockRoute);
+ when(
+ mockPlugin.routeDataMovementEventToDestination(anyInt(),
+ anyInt(), anyInt())).thenReturn(mockRoute);
+
+ // send an input failed event
+ v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+ new TezEvent(InputFailedEvent.create(0, 0),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+
+ // ask for events with sufficient buffer. get only input failed event. all DM events obsoleted
+ int fromEventId = 0;
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
+ fromEventId = eventInfo.getNextFromEventId();
+ Assert.assertEquals(12, fromEventId);
+ Assert.assertEquals(1, eventInfo.getEvents().size());
+ Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
+ }
+
+ @Test (timeout = 5000)
+ public void testVertexGetTAAttemptsObsoletionWithPendingRoutes() throws Exception {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v1 = vertices.get("vertex1");
+ startVertex(v1);
+ VertexImpl v2 = vertices.get("vertex2");
+ startVertex(v2);
+ VertexImpl v3 = vertices.get("vertex3");
+ VertexImpl v4 = vertices.get("vertex4");
+
+ List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
+ // scheduling start to trigger edge routing to begin
+ for (int i=0; i<v4.getTotalTasks(); ++i) {
+ taskList.add(ScheduleTaskRequest.create(i, null));
+ }
+
+ v4.scheduleTasks(taskList);
+ Assert.assertEquals(VertexState.RUNNING, v4.getState());
+ Assert.assertEquals(1, v4.sourceVertices.size());
+ Edge e = v4.sourceVertices.get(v3);
+ TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+ TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+
+ for (int i=0; i<11; ++i) {
+ v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+ new TezEvent(DataMovementEvent.create(0, null),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+ }
+ dispatcher.await();
+ // verify all events have been are in taskEvents
+ Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+
+ TaskAttemptEventInfo eventInfo;
+ EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+ EventRouteMetadata mockFailedRoute = EventRouteMetadata.create(1, new int[]{0});
+ e.edgeManager = mockPlugin;
+ when(
+ mockPlugin.routeInputSourceTaskFailedEventToDestination(anyInt(),
+ anyInt())).thenReturn(mockFailedRoute);
+
+ // return more events that dont evenly fit in max size
+ EventRouteMetadata mockRoute = EventRouteMetadata.create(2, new int[]{0, 0});
+ when(
+ mockPlugin.routeDataMovementEventToDestination(anyInt(),
+ anyInt(), anyInt())).thenReturn(mockRoute);
+
+ int fromEventId = 0;
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
+ fromEventId = eventInfo.getNextFromEventId();
+ Assert.assertEquals(2, fromEventId); // 0-1 events expanded and fit, 2nd event has pending routes
+ Assert.assertEquals(5, eventInfo.getEvents().size());
+
+ // send an input failed event
+ v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+ new TezEvent(InputFailedEvent.create(0, 0),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+
+ // get only input failed event. all DM events obsoleted
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
+ fromEventId = eventInfo.getNextFromEventId();
+ Assert.assertEquals(12, fromEventId);
+ Assert.assertEquals(1, eventInfo.getEvents().size());
+ Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
+ }
@Test(timeout = 5000)
public void testVertexReconfigurePlannedAfterInit() throws Exception {
http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index ec27a45..a4f3c27 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -360,7 +360,9 @@ public class TestInput extends AbstractLogicalInput {
dmEvent.getUserPayload().getInt();
} else if (event instanceof InputFailedEvent) {
InputFailedEvent ifEvent = (InputFailedEvent) event;
- numCompletedInputs--;
+ if (this.completedInputVersion[ifEvent.getTargetIndex()] == ifEvent.getVersion()) {
+ numCompletedInputs--;
+ }
LOG.info("Received InputFailed event targetId: " + ifEvent.getTargetIndex() +
" version: " + ifEvent.getVersion() +
" numInputs: " + getNumPhysicalInputs() +