You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/14 04:41:49 UTC

tez git commit: TEZ-2599. Dont send obsoleted data movement events to tasks (bikas) (cherry picked from commit aca83090ed2580d093a1fec59217d78f2e575a6c)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 282f4e13e -> f5c87c1ef


TEZ-2599. Dont send obsoleted data movement events to tasks (bikas)
(cherry picked from commit aca83090ed2580d093a1fec59217d78f2e575a6c)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f5c87c1e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f5c87c1e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f5c87c1e

Branch: refs/heads/branch-0.7
Commit: f5c87c1ef0da3a92190e41eabad701bcb2fd551c
Parents: 282f4e1
Author: Bikas Saha <bi...@apache.org>
Authored: Sat Jul 4 01:17:24 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Dec 13 19:39:42 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  51 +++++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 134 +++++++++++++++++++
 .../java/org/apache/tez/test/TestInput.java     |   4 +-
 4 files changed, 181 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f6f028c..50d0b67 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,6 @@
 Apache Tez Change Log
 =====================
 
-Release 0.7.1: Unreleased
-
 INCOMPATIBLE CHANGES
   TEZ-2679. Admin forms of launch env settings
   TEZ-2949. Allow duplicate dag names within session for Tez.
@@ -11,6 +9,7 @@ ALL CHANGES
   TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
   TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source.
   TEZ-2995. Timeline primary filter should only be on callerId and not type.
+  TEZ-2599. Dont send obsoleted data movement events to tasks
   TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
   reduce and slow start
   TEZ-2956. Handle auto-reduce parallelism when the

http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 2f2b0c2..cf24348 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -799,6 +799,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     final TezEvent tezEvent;
     final Edge eventEdge;
     final int eventTaskIndex;
+    boolean isObsolete = false;
     EventInfo(TezEvent tezEvent, Edge eventEdge, int eventTaskIndex) {
       this.tezEvent = tezEvent;
       this.eventEdge = eventEdge;
@@ -4224,12 +4225,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID
               + " vertex: " + getLogIdentifier());
           boolean isFirstEvent = true;
+          boolean firstEventObsoleted = false;
           for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId) {
             boolean earlyExit = false;
             if (events.size() == maxEvents) {
               break;
             }
             EventInfo eventInfo = onDemandRouteEvents.get(nextFromEventId);
+            if (eventInfo.isObsolete) {
+              // ignore obsolete events
+              firstEventObsoleted = true;
+              continue;
+            }
             TezEvent tezEvent = eventInfo.tezEvent;
             switch(tezEvent.getEventType()) {
             case INPUT_FAILED_EVENT:
@@ -4240,11 +4247,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                 Edge srcEdge = eventInfo.eventEdge;
                 PendingEventRouteMetadata pendingRoute = null;
                 if (isFirstEvent) {
-                  // do this precondition check only for the first event
+                  // the first event is the one that can have pending routes because its expanded
+                  // events had not been completely sent in the last round.
                   isFirstEvent = false;
                   pendingRoute = srcEdge.removePendingEvents(attemptID);
                   if (pendingRoute != null) {
-                    Preconditions.checkState(tezEvent == pendingRoute.getTezEvent()); // same object
+                    // the first event must match the pending route event
+                    // the only reason it may not match is if in between rounds that event got
+                    // obsoleted
+                    if(tezEvent != pendingRoute.getTezEvent()) {
+                      Preconditions.checkState(firstEventObsoleted);
+                      // pending routes can be ignored for obsoleted events
+                      pendingRoute = null;
+                    }
                   }
                 }
                 if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex,
@@ -4384,12 +4399,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                     getLogIdentifier());
               }
               if (srcEdge.hasOnDemandRouting()) {
-                onDemandRouteEventsWriteLock.lock();
-                try {
-                  onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
-                } finally {
-                  onDemandRouteEventsWriteLock.unlock();
-                }
+                processOnDemandEvent(tezEvent, srcEdge, srcTaskIndex);
               } else {
                 // send to tasks            
                 srcEdge.sendTezEventToDestinationTasks(tezEvent);
@@ -4525,6 +4535,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
     }
   }
+  
+  private void processOnDemandEvent(TezEvent tezEvent, Edge srcEdge, int srcTaskIndex) {
+    onDemandRouteEventsWriteLock.lock();
+    try {
+      onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
+      if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
+        for (EventInfo eventInfo : onDemandRouteEvents) {
+          if (eventInfo.eventEdge == srcEdge 
+              && eventInfo.tezEvent.getSourceInfo().getTaskAttemptID().equals(
+                 tezEvent.getSourceInfo().getTaskAttemptID())
+              && (eventInfo.tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT
+                  || eventInfo.tezEvent
+                      .getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+            // any earlier data movement events from the same source
+            // edge+task
+            // can be obsoleted by an input failed event from the
+            // same source edge+task
+            eventInfo.isObsolete = true;
+          }
+        }
+      }
+    } finally {
+      onDemandRouteEventsWriteLock.unlock();
+    }
+  }
 
   private static class InternalErrorTransition implements
       SingleArcTransition<VertexImpl, VertexEvent> {

http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index d08d011..fe540e8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -183,6 +183,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
@@ -191,6 +192,7 @@ import org.apache.tez.test.EdgeManagerForTest;
 import org.apache.tez.test.VertexManagerPluginForTest;
 import org.apache.tez.test.VertexManagerPluginForTest.VertexManagerPluginForTestConfig;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -2787,6 +2789,13 @@ public class TestVertexImpl {
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
     
+    // ask for events with sufficient buffer. get all events in a single shot.
+    fromEventId = 0;
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(11, fromEventId);
+    Assert.assertEquals(11, eventInfo.getEvents().size());
+    
     // change max events to larger value. max events does not evenly divide total events
     fromEventId = 0;
     for (int i=1; i<=2; ++i) {
@@ -2816,6 +2825,131 @@ public class TestVertexImpl {
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(2, eventInfo.getEvents().size()); // remainder events
   }
+  
+  @Test (timeout = 5000)
+  public void testVertexGetTAAttemptsObsoletion() throws Exception {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(v1);
+    VertexImpl v2 = vertices.get("vertex2");
+    startVertex(v2);
+    VertexImpl v3 = vertices.get("vertex3");
+    VertexImpl v4 = vertices.get("vertex4");
+    
+    List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
+    // scheduling start to trigger edge routing to begin
+    for (int i=0; i<v4.getTotalTasks(); ++i) {
+      taskList.add(ScheduleTaskRequest.create(i, null));
+    }
+    v4.scheduleTasks(taskList);
+    Assert.assertEquals(VertexState.RUNNING, v4.getState());
+    Assert.assertEquals(1, v4.sourceVertices.size());
+    Edge e = v4.sourceVertices.get(v3);
+    TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+    TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+    
+    for (int i=0; i<11; ++i) {
+      v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+          new TezEvent(DataMovementEvent.create(0, null), 
+              new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+    }
+    dispatcher.await();
+    // verify all events have been are in taskEvents
+    Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+    
+    TaskAttemptEventInfo eventInfo;
+    EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+    EventRouteMetadata mockRoute = EventRouteMetadata.create(1, new int[]{0});
+    e.edgeManager = mockPlugin;
+    when(
+        mockPlugin.routeInputSourceTaskFailedEventToDestination(anyInt(),
+            anyInt())).thenReturn(mockRoute);
+    when(
+        mockPlugin.routeDataMovementEventToDestination(anyInt(),
+            anyInt(), anyInt())).thenReturn(mockRoute);
+    
+    // send an input failed event
+    v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+        new TezEvent(InputFailedEvent.create(0, 0), 
+            new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+
+    // ask for events with sufficient buffer. get only input failed event. all DM events obsoleted
+    int fromEventId = 0;
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(12, fromEventId);
+    Assert.assertEquals(1, eventInfo.getEvents().size());
+    Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
+  }
+  
+  @Test (timeout = 5000)
+  public void testVertexGetTAAttemptsObsoletionWithPendingRoutes() throws Exception {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(v1);
+    VertexImpl v2 = vertices.get("vertex2");
+    startVertex(v2);
+    VertexImpl v3 = vertices.get("vertex3");
+    VertexImpl v4 = vertices.get("vertex4");
+
+    List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
+    // scheduling start to trigger edge routing to begin
+    for (int i=0; i<v4.getTotalTasks(); ++i) {
+      taskList.add(ScheduleTaskRequest.create(i, null));
+    }
+    
+    v4.scheduleTasks(taskList);
+    Assert.assertEquals(VertexState.RUNNING, v4.getState());
+    Assert.assertEquals(1, v4.sourceVertices.size());
+    Edge e = v4.sourceVertices.get(v3);
+    TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+    TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+    
+    for (int i=0; i<11; ++i) {
+      v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+          new TezEvent(DataMovementEvent.create(0, null), 
+              new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+    }
+    dispatcher.await();
+    // verify all events have been are in taskEvents
+    Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+    
+    TaskAttemptEventInfo eventInfo;
+    EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+    EventRouteMetadata mockFailedRoute = EventRouteMetadata.create(1, new int[]{0});
+    e.edgeManager = mockPlugin;
+    when(
+        mockPlugin.routeInputSourceTaskFailedEventToDestination(anyInt(),
+            anyInt())).thenReturn(mockFailedRoute);
+
+    // return more events that dont evenly fit in max size
+    EventRouteMetadata mockRoute = EventRouteMetadata.create(2, new int[]{0, 0});    
+    when(
+        mockPlugin.routeDataMovementEventToDestination(anyInt(),
+            anyInt(), anyInt())).thenReturn(mockRoute);
+    
+    int fromEventId = 0;
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(2, fromEventId); // 0-1 events expanded and fit, 2nd event has pending routes
+    Assert.assertEquals(5, eventInfo.getEvents().size());
+    
+    // send an input failed event
+    v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+        new TezEvent(InputFailedEvent.create(0, 0), 
+            new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+
+    // get only input failed event. all DM events obsoleted
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
+    fromEventId = eventInfo.getNextFromEventId();
+    Assert.assertEquals(12, fromEventId);
+    Assert.assertEquals(1, eventInfo.getEvents().size());
+    Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
+  }
 
   @Test(timeout = 5000)
   public void testVertexReconfigurePlannedAfterInit() throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/f5c87c1e/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index ec27a45..a4f3c27 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -360,7 +360,9 @@ public class TestInput extends AbstractLogicalInput {
             dmEvent.getUserPayload().getInt();
       } else if (event instanceof InputFailedEvent) {
         InputFailedEvent ifEvent = (InputFailedEvent) event;
-        numCompletedInputs--;
+        if (this.completedInputVersion[ifEvent.getTargetIndex()] == ifEvent.getVersion()) {
+          numCompletedInputs--;
+        }
         LOG.info("Received InputFailed event targetId: " + ifEvent.getTargetIndex() +
             " version: " + ifEvent.getVersion() +
             " numInputs: " + getNumPhysicalInputs() +