You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/27 00:07:58 UTC

git commit: TEZ-715. Auto Reduce Parallelism can rarely trigger NPE in AM at DAGAppMaster.handle(DAGAppMaster.java:1268) (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master f2f31e0ef -> a9bace052


TEZ-715. Auto Reduce Parallelism can rarely trigger NPE in AM at DAGAppMaster.handle(DAGAppMaster.java:1268) (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a9bace05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a9bace05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a9bace05

Branch: refs/heads/master
Commit: a9bace052a9dba4b1aa08c2e9310df73928dad84
Parents: f2f31e0
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 26 15:07:30 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 26 15:07:30 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/dag/app/dag/Task.java   |  2 -
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 12 ----
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 62 +++++++++-----------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 42 +++++++++++++
 4 files changed, 69 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 1d25930..8243b70 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -78,8 +78,6 @@ public interface Task {
   public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
       int fromEventId, int maxEvents);
   
-  public List<TezEvent> getAndClearTaskTezEvents();
-
   public List<String> getDiagnostics();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 368406a..793c12a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -492,18 +492,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   @Override
-  public List<TezEvent> getAndClearTaskTezEvents() {
-    readLock.lock();
-    try {
-      List<TezEvent> events = tezEventsForTaskAttempts;
-      tezEventsForTaskAttempts = new ArrayList<TezEvent>();
-      return events;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Override
   public List<String> getDiagnostics() {
     List<String> diagnostics = new ArrayList<String>(attempts.size());
     readLock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a9139ab..d3e07cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -462,7 +462,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Set<String> inputsWithInitializers;
   private int numInitializedInputs;
   private boolean startSignalPending = false;
-  List<TezEvent> pendingRouteEvents = null;
+  private boolean tasksNotYetScheduled = true;
+  // We may always store task events in the vertex for scalability
+  List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
+  List<TezEvent> pendingRouteEventsWhileIniting = null;
   List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
 
   private RootInputInitializerRunner rootInputInitializer;
@@ -803,6 +806,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public void scheduleTasks(List<Integer> taskIDs) {
     readLock.lock();
     try {
+      tasksNotYetScheduled = false;
+      if (!pendingTaskEvents.isEmpty()) {
+        VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
+            new VertexEventRouteEvent(getVertexId(), pendingTaskEvents));
+        pendingTaskEvents.clear();
+      }
       for (Integer taskID : taskIDs) {
         if (tasks.size() <= taskID.intValue()) {
           throw new TezUncheckedException(
@@ -874,10 +883,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           edge.startEventBuffering();
         }
   
-        // Use a set since the same event may have been sent to multiple tasks
-        // and we want to avoid duplicates
-        Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
-  
         LOG.info("Vertex " + getVertexId() + 
             " parallelism set to " + parallelism + " from " + numTasks);
         // assign to local variable of LinkedHashMap to make sure that changing
@@ -896,7 +901,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                     + " for vertex: " + getVertexId() + " name: " + getName());
             return false;
           }
-          pendingEvents.addAll(task.getAndClearTaskTezEvents());
           if (i <= parallelism) {
             continue;
           }
@@ -913,7 +917,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 + entry.getKey() + " destination: " + getVertexId());
             Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
             Edge edge = sourceVertices.get(sourceVertex);
-            EdgeProperty edgeProperty = edge.getEdgeProperty();
             try {
               edge.setCustomEdgeManager(entry.getValue());
             } catch (Exception e) {
@@ -926,21 +929,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
         }
   
-        // Re-route all existing TezEvents according to new routing table
-        // At this point only events attributed to source task attempts can be
-        // re-routed. e.g. DataMovement or InputFailed events.
-        // This assumption is fine for now since these tasks haven't been started.
-        // So they can only get events generated from source task attempts that
-        // have already been started.
-        DAG dag = getDAG();
-        for(TezEvent event : pendingEvents) {
-          TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
-              .getTaskID().getVertexID();
-          Vertex sourceVertex = dag.getVertex(sourceVertexId);
-          Edge sourceEdge = sourceVertices.get(sourceVertex);
-          sourceEdge.sendTezEventToDestinationTasks(event);
-        }
-  
         // stop buffering events
         for (Edge edge : sourceVertices.values()) {
           edge.stopEventBuffering();
@@ -1550,10 +1538,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
 
     // Vertex will be moving to INITED state, safe to process pending route events.
-    if (pendingRouteEvents != null) {
+    if (pendingRouteEventsWhileIniting != null) {
       VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
-          new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
-      pendingRouteEvents = null;
+          new VertexEventRouteEvent(getVertexId(), pendingRouteEventsWhileIniting));
+      pendingRouteEventsWhileIniting = null;
     }
     return vertexState;
   }
@@ -2032,12 +2020,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRouteEvent re = (VertexEventRouteEvent) event;
-      if (vertex.pendingRouteEvents == null) {
-        vertex.pendingRouteEvents = Lists.newLinkedList();
+      if (vertex.pendingRouteEventsWhileIniting == null) {
+        vertex.pendingRouteEventsWhileIniting = Lists.newLinkedList();
       }
       // Store the events for post-init routing, since INIT state is when
       // initial task parallelism will be set
-      vertex.pendingRouteEvents.addAll(re.getEvents());
+      vertex.pendingRouteEventsWhileIniting.addAll(re.getEvents());
     }
   }
 
@@ -2103,14 +2091,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             } else {
               // event not from this vertex. must have come from source vertex.
               // send to tasks
-              Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
-                  sourceMeta.getTaskVertexName()));
-              if (srcEdge == null) {
-                throw new TezUncheckedException("Bad source vertex: " +
-                    sourceMeta.getTaskVertexName() + " for destination vertex: " +
-                    vertex.getVertexId());
+              if (vertex.tasksNotYetScheduled) {
+                vertex.pendingTaskEvents.add(tezEvent);
+              } else {
+                Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+                    sourceMeta.getTaskVertexName()));
+                if (srcEdge == null) {
+                  throw new TezUncheckedException("Bad source vertex: " +
+                      sourceMeta.getTaskVertexName() + " for destination vertex: " +
+                      vertex.getVertexId());
+                }
+                srcEdge.sendTezEventToDestinationTasks(tezEvent);
               }
-              srcEdge.sendTezEventToDestinationTasks(tezEvent);
             }
           }
           break;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 0bf2b8c..1a14b15 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -98,6 +98,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
@@ -114,12 +115,16 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.test.EdgeManagerForTest;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -1419,6 +1424,43 @@ public class TestVertexImpl {
     Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
 
   }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexPendingTaskEvents() {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v3 = vertices.get("vertex3");
+    VertexImpl v2 = vertices.get("vertex2");
+    
+    startVertex(v2);
+    startVertex(v3);
+    
+    TezTaskID t0_v2 = TezTaskID.getInstance(v2.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v2 = TezTaskAttemptID.getInstance(t0_v2, 0);
+
+    List<TezEvent> taskEvents = Lists.newLinkedList();
+    TezEvent tezEvent1 = new TezEvent(
+        new CompositeDataMovementEvent(0, 1, new byte[0]), 
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
+    TezEvent tezEvent2 = new TezEvent(
+        new DataMovementEvent(0, new byte[0]), 
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
+    taskEvents.add(tezEvent1);
+    taskEvents.add(tezEvent2);
+    // send events and test that they are buffered until some task is scheduled
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v3.getVertexId(), taskEvents));
+    dispatcher.await();
+    Assert.assertEquals(2, v3.pendingTaskEvents.size());
+    v3.scheduleTasks(Collections.singletonList(new Integer(0)));
+    dispatcher.await();
+    Assert.assertEquals(0, v3.pendingTaskEvents.size());
+    // send events and test that they are not buffered anymore
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v3.getVertexId(), taskEvents));
+    dispatcher.await();
+    Assert.assertEquals(0, v3.pendingTaskEvents.size());
+  }
 
   @Test(timeout = 5000)
   public void testSetCustomEdgeManager() throws UnsupportedEncodingException {