You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/01/20 18:05:32 UTC

[06/50] [abbrv] tez git commit: TEZ-2914. Ability to limit vertex concurrency (bikas)

TEZ-2914. Ability to limit vertex concurrency (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/34eb75d7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/34eb75d7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/34eb75d7

Branch: refs/heads/TEZ-2980
Commit: 34eb75d709bbe6e0417f9b4023f4fa1cec81bd8b
Parents: 12bd908
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Dec 25 19:39:04 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 25 19:39:04 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  12 +-
 .../apache/tez/dag/app/dag/DAGScheduler.java    |  68 +++++++++-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +-
 .../app/dag/event/DAGEventSchedulerUpdate.java  |   3 +-
 .../DAGEventSchedulerUpdateTAAssigned.java      |  36 ------
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  17 +--
 .../app/dag/impl/DAGSchedulerNaturalOrder.java  |  15 +--
 .../DAGSchedulerNaturalOrderControlled.java     |  15 +--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  43 ++-----
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   6 +
 .../tez/dag/app/rm/TaskSchedulerManager.java    |   2 -
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  13 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |  50 ++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   4 -
 .../tez/dag/app/dag/impl/TestDAGScheduler.java  | 127 ++++++++++++++++++-
 .../TestDAGSchedulerNaturalOrderControlled.java |  38 +++---
 .../dag/app/rm/TestTaskSchedulerManager.java    |   6 +-
 18 files changed, 306 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a3b0fa6..25cfd86 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-2914. Ability to limit vertex concurrency
   TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
   TEZ-3006. Remove unused import in TestHistoryParser.
   TEZ-2910. Set caller context for tracing ( integrate with HDFS-9184 ).

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index b707857..9f7777f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -537,7 +537,17 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX +
       "max.app.attempts";
   public static final int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2;
-  
+
+  /**
+   * Int value. The maximum number of attempts that can run concurrently for a given vertex.
+   * Setting <=0 implies no limit
+   */
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY =
+      TEZ_AM_PREFIX + "vertex.max-task-concurrency";
+  public static final int TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT = -1;
+
   /**
    * Int value. The maximum number of attempts that can fail for a particular task before the task is failed. 
    * This does not count killed attempts. Task failure results in DAG failure.

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index 2d3b006..87a6261 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -18,16 +18,70 @@
 
 package org.apache.tez.dag.app.dag;
 
+import java.util.Map;
+import java.util.Queue;
+
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.records.TezVertexID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
-public interface DAGScheduler {
+public abstract class DAGScheduler {
+  private static class VertexInfo {
+    int concurrencyLimit;
+    int concurrency;
+    Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList();
+    
+    VertexInfo(int limit) {
+      this.concurrencyLimit = limit;
+    }
+  }
   
-  public void vertexCompleted(Vertex vertex);
+  Map<TezVertexID, VertexInfo> vertexInfo = null;
   
-  public void scheduleTask(DAGEventSchedulerUpdate event);
+  public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
+    if (vertexInfo == null) {
+      vertexInfo = Maps.newHashMap();
+    }
+    if (concurrency > 0) {
+      vertexInfo.put(vId, new VertexInfo(concurrency));
+    }
+  }
   
-  public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event);
-
-  public void taskSucceeded(DAGEventSchedulerUpdate event);
+  public void scheduleTask(DAGEventSchedulerUpdate event) {
+    VertexInfo vInfo = null;
+    if (vertexInfo != null) {
+      vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
+    }
+    scheduleTaskWithLimit(event, vInfo);
+  }
+  
+  private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
+    if (vInfo != null) {
+      if (vInfo.concurrency >= vInfo.concurrencyLimit) {
+        vInfo.pendingAttempts.add(event);
+        return; // already at max concurrency
+      }
+      vInfo.concurrency++;
+    }
+    scheduleTaskEx(event);
+  }
+  
+  public void taskCompleted(DAGEventSchedulerUpdate event) {
+    taskCompletedEx(event);
+    if (vertexInfo != null) {
+      VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
+      if (vInfo != null) {
+        vInfo.concurrency--;
+        if (!vInfo.pendingAttempts.isEmpty()) {
+          scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo);
+        }
+      }
+    }
+  }
+  
+  public abstract void scheduleTaskEx(DAGEventSchedulerUpdate event);
+  
+  public abstract void taskCompletedEx(DAGEventSchedulerUpdate event);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9fc73a2..54f2ffa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -85,7 +85,7 @@ public interface Vertex extends Comparable<Vertex> {
    */
   TezCounters getCachedCounters();
 
-
+  int getMaxTaskConcurrency();
   Map<TezTaskID, Task> getTasks();
   Task getTask(TezTaskID taskID);
   Task getTask(int taskIndex);

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
index a436a8c..eda02b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
@@ -24,8 +24,7 @@ public class DAGEventSchedulerUpdate extends DAGEvent {
   
   public enum UpdateType {
     TA_SCHEDULE,
-    TA_SCHEDULED,
-    TA_SUCCEEDED
+    TA_COMPLETED
   }
   
   private final TaskAttempt attempt;

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
deleted file mode 100644
index 8e27843..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-
-public class DAGEventSchedulerUpdateTAAssigned extends DAGEventSchedulerUpdate {
-  
-  final Container container;
-
-  public DAGEventSchedulerUpdateTAAssigned(TaskAttempt attempt, Container container) {
-    super(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULED, attempt);
-    this.container = container;
-  }
-
-  public Container getContainer() {
-    return container;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3d47450..60f933f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -104,7 +104,6 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
@@ -1592,6 +1591,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
     dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] {
         DAG.class, EventHandler.class}, new Object[] {dag, dag.eventHandler});
+    for (Vertex v : dag.vertices.values()) {
+      dag.dagScheduler.addVertexConcurrencyLimit(v.getVertexId(), v.getMaxTaskConcurrency());
+    }
   }
 
   private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
@@ -1903,10 +1905,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
       job.numCompletedVertices++;
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
-        if (!job.reRunningVertices.contains(vertex.getVertexId())) {
-          // vertex succeeded for the first time
-          job.dagScheduler.vertexCompleted(vertex);
-        }
         forceTransitionToKillWait = !(job.vertexSucceeded(vertex));
       }
       else if (vertexEvent.getVertexState() == VertexState.FAILED) {
@@ -2146,13 +2144,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         case TA_SCHEDULE:
           dag.dagScheduler.scheduleTask(sEvent);
           break;
-        case TA_SCHEDULED:
-          DAGEventSchedulerUpdateTAAssigned taEvent =
-                                (DAGEventSchedulerUpdateTAAssigned) sEvent;
-          dag.dagScheduler.taskScheduled(taEvent);
-          break;
-        case TA_SUCCEEDED:
-          dag.dagScheduler.taskSucceeded(sEvent);
+        case TA_COMPLETED:
+          dag.dagScheduler.taskCompleted(sEvent);
           break;
         default:
           throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:"

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index 8d42227..4246ad0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -26,11 +26,10 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 
 @SuppressWarnings("rawtypes")
-public class DAGSchedulerNaturalOrder implements DAGScheduler {
+public class DAGSchedulerNaturalOrder extends DAGScheduler {
   
   private static final Logger LOG = 
                             LoggerFactory.getLogger(DAGSchedulerNaturalOrder.class);
@@ -44,11 +43,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler {
   }
   
   @Override
-  public void vertexCompleted(Vertex vertex) {
-  }
-
-  @Override
-  public void scheduleTask(DAGEventSchedulerUpdate event) {
+  public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
     TaskAttempt attempt = event.getAttempt();
     Vertex vertex = dag.getVertex(attempt.getVertexID());
     int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
@@ -69,11 +64,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler {
   }
   
   @Override
-  public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
-  }
-
-  @Override
-  public void taskSucceeded(DAGEventSchedulerUpdate event) {
+  public void taskCompletedEx(DAGEventSchedulerUpdate event) {
   }
   
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
index 2469a2f..0802dce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -36,7 +36,6 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
@@ -50,7 +49,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
  * - generic slow start mechanism across all vertices - independent of the type of edges.
  */
 @SuppressWarnings("rawtypes")
-public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
+public class DAGSchedulerNaturalOrderControlled extends DAGScheduler {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(DAGSchedulerNaturalOrderControlled.class);
@@ -72,13 +71,9 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
     this.handler = dispatcher;
   }
 
-  @Override
-  public void vertexCompleted(Vertex vertex) {
-  }
-
   // TODO Does ordering matter - it currently depends on the order returned by vertex.getOutput*
   @Override
-  public void scheduleTask(DAGEventSchedulerUpdate event) {
+  public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
     TaskAttempt attempt = event.getAttempt();
     Vertex vertex = dag.getVertex(attempt.getVertexID());
     int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
@@ -241,11 +236,7 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
   }
 
   @Override
-  public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
-  }
-
-  @Override
-  public void taskSucceeded(DAGEventSchedulerUpdate event) {
+  public void taskCompletedEx(DAGEventSchedulerUpdate event) {
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 0f76a63..c00d674 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -825,40 +825,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
       TaskAttemptStateInternal attemptState) {
     this.sendTaskAttemptCompletionEvent(attemptId, attemptState);
-  }
-
-  // TODO: Recovery
-  /*
-  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
-    TaskFinishedEvent tfe =
-      new TaskFinishedEvent(task.taskId,
-        task.successfulAttempt,
-        task.getFinishTime(task.successfulAttempt),
-        task.taskId.getTaskType(),
-        taskState.toString(),
-        task.getCounters());
-    return tfe;
-  }
-
-  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TezTaskAttemptID taId) {
-    StringBuilder errorSb = new StringBuilder();
-    if (diag != null) {
-      for (String d : diag) {
-        errorSb.append(", ").append(d);
-      }
+    if (getInternalState() != TaskStateInternal.SUCCEEDED) {
+      sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action
     }
-    TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
-        TypeConverter.fromYarn(task.taskId),
-     // Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition.
-        task.getFinishTime(taId),
-        TypeConverter.fromYarn(task.getType()),
-        errorSb.toString(),
-        taskState.toString(),
-        taId == null ? null : TypeConverter.fromYarn(taId));
-    return taskFailedEvent;
   }
-  */
 
+  private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) {
+    // send notification to DAG scheduler
+    eventHandler.handle(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, attempts.get(taId)));
+  }
+  
   private static void unSucceed(TaskImpl task) {
     task.commitAttempt = null;
     task.successfulAttempt = null;
@@ -1105,10 +1082,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
               .getID(), diagnostics, errCause));
         }
       }
-      // send notification to DAG scheduler
-      task.eventHandler.handle(new DAGEventSchedulerUpdate(
-          DAGEventSchedulerUpdate.UpdateType.TA_SUCCEEDED, task.attempts
-              .get(task.successfulAttempt)));
       return task.finished(TaskStateInternal.SUCCEEDED);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 93baa0a..065974e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1184,6 +1184,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       readLock.unlock();
     }
   }
+  
+  @Override
+  public int getMaxTaskConcurrency() {
+    return vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, 
+        TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT);
+  }
 
   public VertexStats getVertexStats() {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index dbf8e38..f688b57 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -69,7 +69,6 @@ import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -569,7 +568,6 @@ public class TaskSchedulerManager extends AbstractService implements
       sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
           event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
     }
-    sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
     sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
         event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
             .getContainerContext().getCredentials(), event.getPriority()));

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index bc7fa98..b322e05 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -83,7 +83,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-@SuppressWarnings("unchecked")
 public class MockDAGAppMaster extends DAGAppMaster {
   
   private static final Logger LOG = LoggerFactory.getLogger(MockDAGAppMaster.class);
@@ -95,6 +94,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   EventsDelegate eventsDelegate;
   CountersDelegate countersDelegate;
   StatisticsDelegate statsDelegate;
+  ContainerDelegate containerDelegate;
   long launcherSleepTime = 1;
   boolean doSleep = true;
   int handlerConcurrency = 1;
@@ -115,6 +115,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
   public static interface EventsDelegate {
     public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time);
   }
+  
+  public static interface ContainerDelegate {
+    public void stop(ContainerStopRequest event);
+    public void launch(ContainerLaunchRequest event);
+  }
 
   // mock container launcher does not launch real tasks.
   // Upon, launch of a container is simulates the container asking for tasks
@@ -268,6 +273,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
     void stop(ContainerStopRequest event) {
       // remove from simulated container list
       containers.remove(event.getContainerId());
+      if (containerDelegate != null) {
+        containerDelegate.stop(event);
+      }
       getContext().containerStopRequested(event.getContainerId());
     }
 
@@ -277,6 +285,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
           event.getContainerLaunchContext());
       containers.put(event.getContainerId(), cData);
       containersToProcess.add(cData);
+      if (containerDelegate != null) {
+        containerDelegate.launch(event);
+      }
       getContext().containerLaunched(event.getContainerId());
     }
     

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index b0bc571..d5ee67d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
@@ -73,6 +74,7 @@ import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
+import org.apache.tez.dag.app.MockDAGAppMaster.ContainerDelegate;
 import org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
@@ -100,6 +102,8 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -406,6 +410,52 @@ public class TestMockDAGAppMaster {
 
     tezClient.stop();
   }
+  
+  @Test (timeout = 100000)
+  public void testConcurrencyLimit() throws Exception {
+    // the test relies on local mode behavior of launching a new container per task.
+    // so task concurrency == container concurrency
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    
+    final int concurrencyLimit = 5;
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+        null, false, false, concurrencyLimit*4, 1000);
+
+    tezClient.start();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    
+    final AtomicInteger concurrency = new AtomicInteger(0);
+    final AtomicBoolean exceededConcurrency = new AtomicBoolean(false);
+    mockApp.containerDelegate = new ContainerDelegate() {
+      @Override
+      public void stop(ContainerStopRequest event) {
+        concurrency.decrementAndGet();
+      }
+      @Override
+      public void launch(ContainerLaunchRequest event) {
+        int maxConc = concurrency.incrementAndGet();
+        if (maxConc > concurrencyLimit) {
+          exceededConcurrency.set(true);
+        }
+        System.out.println("Launched: " + maxConc);
+      }
+    };
+    DAG dag = DAG.create("testConcurrencyLimit");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 20).setConf(
+        TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, String.valueOf(concurrencyLimit));
+    dag.addVertex(vA);
+
+    mockLauncher.startScheduling(true);
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    Assert.assertFalse(exceededConcurrency.get());
+    tezClient.stop();
+  }
+
 
   @Test (timeout = 10000)
   public void testBasicCounters() throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 1809230..2158368 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1617,7 +1617,6 @@ public class TestDAGImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, dag.getSuccessfulVertices());
     Assert.assertEquals(1, dag.numCompletedVertices);
-    verify(dag.dagScheduler, times(1)).vertexCompleted(v);
     
     dispatcher.getEventHandler().handle(
         new VertexEventTaskReschedule(TezTaskID.getInstance(vId, 0)));
@@ -1634,9 +1633,6 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dag.getSuccessfulVertices());
     Assert.assertEquals(1, dag.numCompletedVertices);
     
-    // re-completion is not notified again
-    verify(dag.dagScheduler, times(1)).vertexCompleted(v);
-
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index 913f5fa..a28f367 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -25,27 +25,34 @@ import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 import static org.mockito.Mockito.*;
 
+import java.util.List;
+
 public class TestDAGScheduler {
 
   class MockEventHandler implements EventHandler<TaskAttemptEventSchedule> {
     TaskAttemptEventSchedule event;
+    List<TaskAttemptEventSchedule> events = Lists.newLinkedList();
     @Override
     public void handle(TaskAttemptEventSchedule event) {
       this.event = event;
+      this.events.add(event);
     }
-    
   }
   
-  MockEventHandler mockEventHandler = new MockEventHandler();
   
   @Test(timeout=5000)
   public void testDAGSchedulerNaturalOrder() {
+    MockEventHandler mockEventHandler = new MockEventHandler();
     DAG mockDag = mock(DAG.class);
     Vertex mockVertex = mock(Vertex.class);
     TaskAttempt mockAttempt = mock(TaskAttempt.class);
@@ -58,15 +65,125 @@ public class TestDAGScheduler {
     
     DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
         mockEventHandler);
-    scheduler.scheduleTask(event);
+    scheduler.scheduleTaskEx(event);
     Assert.assertEquals(1, mockEventHandler.event.getPriorityHighLimit());
     Assert.assertEquals(3, mockEventHandler.event.getPriorityLowLimit());
-    scheduler.scheduleTask(event);
+    scheduler.scheduleTaskEx(event);
     Assert.assertEquals(4, mockEventHandler.event.getPriorityHighLimit());
     Assert.assertEquals(6, mockEventHandler.event.getPriorityLowLimit());
-    scheduler.scheduleTask(event);
+    scheduler.scheduleTaskEx(event);
     Assert.assertEquals(7, mockEventHandler.event.getPriorityHighLimit());
     Assert.assertEquals(9, mockEventHandler.event.getPriorityLowLimit());
   }
   
+  @Test(timeout=5000)
+  public void testConcurrencyLimit() {
+    MockEventHandler mockEventHandler = new MockEventHandler();
+    DAG mockDag = mock(DAG.class);
+    TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+    TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01");
+    TezTaskID tId0 = TezTaskID.getInstance(vId0, 0);
+    TezTaskID tId1 = TezTaskID.getInstance(vId1, 0);
+    
+    TaskAttempt mockAttempt;
+
+    Vertex mockVertex = mock(Vertex.class);
+    when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
+    when(mockVertex.getDistanceFromRoot()).thenReturn(0);
+    
+    DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
+        mockEventHandler);
+    scheduler.addVertexConcurrencyLimit(vId0, 0); // not effective
+    
+    // schedule beyond limit and it gets scheduled
+    mockAttempt = mock(TaskAttempt.class);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 0));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(1, mockEventHandler.events.size());
+    mockAttempt = mock(TaskAttempt.class);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 1));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(2, mockEventHandler.events.size());
+    mockAttempt = mock(TaskAttempt.class);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 2));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(3, mockEventHandler.events.size());
+    
+    mockEventHandler.events.clear();
+    List<TaskAttempt> mockAttempts = Lists.newArrayList();
+    int completed = 0;
+    int requested = 0;
+    int scheduled = 0;
+    scheduler.addVertexConcurrencyLimit(vId1, 2); // effective    
+    
+    // schedule beyond limit and it gets buffered
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+    Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+        mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+    scheduled++;
+    
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+    Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+        mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+    scheduled++;
+    
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+    scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+    Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+    Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+        mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+    scheduled++;
+
+    scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+    Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+    Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+        mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+    scheduled++;
+
+    scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+    Assert.assertEquals(scheduled, mockEventHandler.events.size()); // no extra scheduling
+
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+    Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+        mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+    scheduled++;
+
+  }
+  
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
index bc86761..63137c7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
@@ -63,35 +63,35 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule all tasks belonging to v0
     for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
     reset(eventHandler);
 
     // Schedule 3 tasks belonging to v2
     for (int i = 0; i < 3; i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
     }
     verify(eventHandler, times(3)).handle(any(Event.class));
     reset(eventHandler);
 
     // Schedule 3 tasks belonging to v3
     for (int i = 0; i < 3; i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
     }
     verify(eventHandler, times(3)).handle(any(Event.class));
     reset(eventHandler);
 
     // Schedule remaining tasks belonging to v2
     for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[2].getTotalTasks() - 3)).handle(any(Event.class));
     reset(eventHandler);
 
     // Schedule remaining tasks belonging to v3
     for (int i = 3; i < vertices[3].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[3].getTotalTasks() - 3)).handle(any(Event.class));
     reset(eventHandler);
@@ -99,7 +99,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule all tasks belonging to v4
     for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[4].getTotalTasks())).handle(any(Event.class));
     reset(eventHandler);
@@ -122,7 +122,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule all tasks belonging to v0
     for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
     reset(eventHandler);
@@ -130,14 +130,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
     // v2 behaving as if configured with slow-start.
     // Schedule all tasks belonging to v3.
     for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
     reset(eventHandler);
 
     // Scheduling all tasks belonging to v4. None should get scheduled.
     for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
     }
     verify(eventHandler, never()).handle(any(Event.class));
     reset(eventHandler);
@@ -145,14 +145,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
     // v2 now starts scheduling ...
     // Schedule 3 tasks for v2 initially.
     for (int i = 0; i < 3; i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
     }
     verify(eventHandler, times(3)).handle(any(Event.class));
     reset(eventHandler);
 
     // Schedule remaining tasks belonging to v2
     for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
     }
     ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
     // All of v2 and v3 should be sent out.
@@ -190,7 +190,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule all tasks belonging to v0
     for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
     reset(eventHandler);
@@ -200,14 +200,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
     // v2 will change parallelism
     // Schedule all tasks belonging to v3
     for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
     reset(eventHandler);
 
     // Schedule all tasks belonging to v4
     for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
     }
     verify(eventHandler, never()).handle(any(Event.class));
     reset(eventHandler);
@@ -218,7 +218,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule all tasks belonging to v2
     for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[2].getTotalTasks() + vertices[4].getTotalTasks()))
         .handle(any(Event.class));
@@ -241,7 +241,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule all but 1 task belonging to v0
     for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
     }
     verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
     reset(eventHandler);
@@ -249,7 +249,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule all tasks belonging to v2
     for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
     }
     // Nothing should be scheduled
     verify(eventHandler, never()).handle(any(Event.class));
@@ -257,14 +257,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
 
     // Schedule an extra attempt for all but 1 task belonging to v0
     for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
-      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 1));
+      dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 1));
     }
     // Only v0 requests should have gone out
     verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
     reset(eventHandler);
 
     // Schedule last task of v0, with attempt 1
-    dagScheduler.scheduleTask(
+    dagScheduler.scheduleTaskEx(
         createScheduleRequest(vertices[0].getVertexId(), vertices[0].getTotalTasks() - 1, 1));
     // One v0 request and all of v2 should have gone out
     verify(eventHandler, times(1 + vertices[2].getTotalTasks())).handle(any(Event.class));

http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 4db51b9..8e4e4f0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -191,10 +191,10 @@ public class TestTaskSchedulerManager {
         new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
             priority, containerContext, 0, 0, 0);
     schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container);
-    assertEquals(2, mockEventHandler.events.size());
-    assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
+    assertEquals(1, mockEventHandler.events.size());
+    assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA);
     AMContainerEventAssignTA assignEvent =
-        (AMContainerEventAssignTA) mockEventHandler.events.get(1);
+        (AMContainerEventAssignTA) mockEventHandler.events.get(0);
     assertEquals(priority, assignEvent.getPriority());
     assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
   }