You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/11/08 23:02:51 UTC

tez git commit: TEZ-1750. Add a DAGScheduler which schedules tasks only when sources have been scheduled. (sseth) (cherry picked from commit 67944a1dff87faf3b8bf7ecf3afb1bcda6c43dda)

Repository: tez
Updated Branches:
  refs/heads/branch-0.5 b99d9afac -> be83286d6


TEZ-1750. Add a DAGScheduler which schedules tasks only when sources
have been scheduled. (sseth)
(cherry picked from commit 67944a1dff87faf3b8bf7ecf3afb1bcda6c43dda)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/be83286d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/be83286d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/be83286d

Branch: refs/heads/branch-0.5
Commit: be83286d657732329f109d1c1d7c9881cfe7ac0c
Parents: b99d9af
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Nov 8 14:02:04 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Nov 8 14:02:41 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |   8 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   9 +-
 .../DAGSchedulerNaturalOrderControlled.java     | 256 +++++++++++++
 .../TestDAGSchedulerNaturalOrderControlled.java | 374 +++++++++++++++++++
 5 files changed, 645 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 55c195e..348541d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ ALL CHANGES:
   TEZ-1747. Increase test timeout for TestSecureShuffle.
   TEZ-1746. Flaky test in TestVertexImpl and TestExceptionPropagation.
   TEZ-1749. Increase test timeout for TestLocalMode.testMultipleClientsWithSession
+  TEZ-1750. Add a DAGScheduler which schedules tasks only when sources have been scheduled.
 
 Release 0.5.2: 2014-11-07
 

http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index d9003b3..6873863 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -339,6 +339,12 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
       TEZ_AM_PREFIX + "client.am.port-range";
 
+  /**
+   * String value. The class to be used for DAG Scheduling. Expert level setting.
+   */
+  public static final String TEZ_AM_DAG_SCHEDULER_CLASS = TEZ_AM_PREFIX + "dag.scheduler.class";
+  public static final String TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT =
+      "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrder";
 
   /** Int value. The amount of memory in MB to be used by the AppMaster */
   public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
@@ -367,7 +373,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Int value. The maximum heartbeat interval between the AM and RM in milliseconds
    * Increasing this reduces the communication between the AM and the RM and can
-   * help in scaling up. Expert level setting. Expert level setting.
+   * help in scaling up. Expert level setting.
    */
   public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX
       + "am-rm.heartbeat.interval-ms.max";

http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index cddcbd5..f877eb4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -75,6 +76,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGReport;
 import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -1333,8 +1335,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   private static void assignDAGScheduler(DAGImpl dag) {
-    LOG.info("Using Natural order dag scheduler");
-    dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+    String dagSchedulerClassName = dag.conf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
+        TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT);
+    LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
+    dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] {
+        DAG.class, EventHandler.class}, new Object[] {dag, dag.eventHandler});
   }
 
   private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {

http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
new file mode 100644
index 0000000..7cfbf5a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGScheduler;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Schedules task attempts belonging to downstream vertices only after all attempts belonging to
+ * upstream vertices have been scheduled. If there's a slow start or delayed start of a particular
+ * vertex, this ensures that downstream tasks are not started before this</p>
+ * Some future enhancements
+ * - consider cluster capacity - and be more aggressive about scheduling downstream tasks before
+ * upstream tasks have completed. </p>
+ * - generic slow start mechanism across all vertices - independent of the type of edges.
+ */
+@SuppressWarnings("rawtypes")
+public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
+
+  private static final Log LOG =
+      LogFactory.getLog(DAGSchedulerNaturalOrderControlled.class);
+
+  private final DAG dag;
+  private final EventHandler handler;
+
+  // Tracks pending events, in case they're not sent immediately.
+  private final ListMultimap<String, TaskAttemptEventSchedule> pendingEvents =
+      LinkedListMultimap.create();
+  // Tacks vertices for which no additional scheduling checks are required. Once in this list, the
+  // vertex is considered to be fully scheduled.
+  private final Set<String> scheduledVertices = new HashSet<String>();
+  // Tracks tasks scheduled for a vertex.
+  private final Map<String, BitSet> vertexScheduledTasks = new HashMap<String, BitSet>();
+
+  public DAGSchedulerNaturalOrderControlled(DAG dag, EventHandler dispatcher) {
+    this.dag = dag;
+    this.handler = dispatcher;
+  }
+
+  @Override
+  public void vertexCompleted(Vertex vertex) {
+  }
+
+  // TODO Does ordering matter - it currently depends on the order returned by vertex.getOutput*
+  @Override
+  public void scheduleTask(DAGEventSchedulerUpdate event) {
+    TaskAttempt attempt = event.getAttempt();
+    Vertex vertex = dag.getVertex(attempt.getVertexID());
+    int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
+
+    // natural priority. Handles failures and retries.
+    int priorityLowLimit = (vertexDistanceFromRoot + 1) * 3;
+    int priorityHighLimit = priorityLowLimit - 2;
+
+    TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule(
+        attempt.getID(), priorityLowLimit, priorityHighLimit);
+
+    taskAttemptSeen(vertex.getName(), attempt.getID());
+
+    if (vertexAlreadyScheduled(vertex)) {
+      // Vertex previously marked ready for scheduling.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Scheduling " + attempt.getID() + " between priorityLow: " + priorityLowLimit
+            + " and priorityHigh: " + priorityHighLimit);
+      }
+      sendEvent(attemptEvent);
+      // A new taks coming in here could send us over the enough tasks scheduled limit.
+      processDownstreamVertices(vertex);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Attempting to schedule vertex: " + vertex.getLogIdentifier() +
+            " due to schedule event");
+      }
+      boolean scheduled = trySchedulingVertex(vertex);
+      if (scheduled) {
+        LOG.info("Scheduled vertex: " + vertex.getLogIdentifier());
+        // If ready to be scheduled, send out pending events and the current event.
+        // Send events out for this vertex first. Then try scheduling downstream vertices.
+        sendEventsForVertex(vertex.getName());
+        sendEvent(attemptEvent);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Processing downstream vertices for vertex: " + vertex.getLogIdentifier());
+        }
+        processDownstreamVertices(vertex);
+      } else {
+        pendingEvents.put(vertex.getName(), attemptEvent);
+      }
+    }
+  }
+
+  private void taskAttemptSeen(String vertexName, TezTaskAttemptID taskAttemptID) {
+    BitSet scheduledTasks = vertexScheduledTasks.get(vertexName);
+    if (scheduledTasks == null) {
+      scheduledTasks = new BitSet();
+      vertexScheduledTasks.put(vertexName, scheduledTasks);
+    }
+    if (taskAttemptID != null) { // null for 0 task vertices
+      scheduledTasks.set(taskAttemptID.getTaskID().getId());
+    }
+  }
+
+  private void sendEventsForVertex(String vertexName) {
+    for (TaskAttemptEventSchedule event : pendingEvents.removeAll(vertexName)) {
+      sendEvent(event);
+    }
+  }
+
+  /* Checks whether this vertex has been marked as ready to go in the past */
+  private boolean vertexAlreadyScheduled(Vertex vertex) {
+    return scheduledVertices.contains(vertex.getName());
+  }
+
+  private boolean scheduledTasksForwarded(Vertex vertex) {
+    boolean canSchedule = false;
+    BitSet scheduledTasks = vertexScheduledTasks.get(vertex.getName());
+    if (scheduledTasks != null) {
+      if (scheduledTasks.cardinality() >= vertex.getTotalTasks()) {
+        canSchedule = true;
+      }
+    }
+    return canSchedule;
+  }
+
+  private void processDownstreamVertices(Vertex vertex) {
+    List<Vertex> newlyScheduledVertices = Lists.newLinkedList();
+    Map<Vertex, Edge> outputVertexEdgeMap = vertex.getOutputVertices();
+    for (Vertex destVertex : outputVertexEdgeMap.keySet()) {
+      if (vertexAlreadyScheduled(destVertex)) { // Nothing to do if already scheduled.
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Attempting to schedule vertex: " + destVertex.getLogIdentifier() +
+              " due to upstream event from " + vertex.getLogIdentifier());
+        }
+        boolean scheduled = trySchedulingVertex(destVertex);
+        if (scheduled) {
+          LOG.info("Scheduled vertex: " + destVertex.getLogIdentifier() +
+              " due to upstream event from " + vertex.getLogIdentifier());
+          sendEventsForVertex(destVertex.getName());
+          newlyScheduledVertices.add(destVertex);
+        }
+      }
+    }
+
+    // Try scheduling all downstream vertices which were scheduled in this run.
+    for (Vertex downStreamVertex : newlyScheduledVertices) {
+      processDownstreamVertices(downStreamVertex);
+    }
+  }
+
+  /* Process the specified vertex, and add it to the cache of scheduled vertices if it can be scheduled */
+  private boolean trySchedulingVertex(Vertex vertex) {
+    boolean canSchedule = true;
+    if (vertexScheduledTasks.get(vertex.getName()) == null) {
+      // No scheduled requests seen yet. Do not mark this as ready.
+      // 0 task vertices handled elsewhere.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "No schedule requests for vertex: " + vertex.getLogIdentifier() + ", Not scheduling");
+      }
+      canSchedule = false;
+    } else {
+      Map<Vertex, Edge> inputVertexEdgeMap = vertex.getInputVertices();
+      if (inputVertexEdgeMap == null || inputVertexEdgeMap.isEmpty()) {
+        // Nothing to wait for. Go ahead and scheduled.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No sources for vertex: " + vertex.getLogIdentifier() + ", Scheduling now");
+        }
+      } else {
+        // Check if all sources are scheduled.
+        for (Vertex srcVertex : inputVertexEdgeMap.keySet()) {
+          if (scheduledTasksForwarded(srcVertex)) {
+            // Nothing to wait for. Go ahead and check the next source.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Trying to schedule: " + vertex.getLogIdentifier() +
+                  ", All tasks forwarded for srcVertex: " + srcVertex.getLogIdentifier() +
+                  ", count: " + srcVertex.getTotalTasks());
+            }
+          } else {
+            // Special case for vertices with 0 tasks. 0 check is sufficient since parallelism cannot increase.
+            if (srcVertex.getTotalTasks() == 0) {
+              LOG.info(
+                  "Vertex: " + srcVertex.getLogIdentifier() + " has 0 tasks. Marking as scheduled");
+              scheduledVertices.add(srcVertex.getName());
+              taskAttemptSeen(srcVertex.getName(), null);
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Not all sources schedule requests complete while trying to schedule: " +
+                        vertex.getLogIdentifier() + ", For source vertex: " +
+                        srcVertex.getLogIdentifier() + ", Forwarded requests: " +
+                        (vertexScheduledTasks.get(srcVertex.getName()) == null ? "null(0)" :
+                            vertexScheduledTasks.get(srcVertex.getName()).cardinality()) +
+                        " out of " + srcVertex.getTotalTasks());
+              }
+              canSchedule = false;
+              break;
+            }
+          }
+        }
+      }
+    }
+    if (canSchedule) {
+      scheduledVertices.add(vertex.getName());
+    }
+    return canSchedule;
+  }
+
+  @Override
+  public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
+  }
+
+  @Override
+  public void taskSucceeded(DAGEventSchedulerUpdate event) {
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(TaskAttemptEventSchedule event) {
+    handler.handle(event);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
new file mode 100644
index 0000000..88a91b6
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestDAGSchedulerNaturalOrderControlled {
+
+  @Test(timeout = 5000)
+  public void testSimpleFlow() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    DAG dag = createMockDag();
+    DAGSchedulerNaturalOrderControlled dagScheduler =
+        new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+    int numVertices = 5;
+    Vertex[] vertices = new Vertex[numVertices];
+    for (int i = 0; i < numVertices; i++) {
+      vertices[i] = dag.getVertex("vertex" + i);
+    }
+
+    // Schedule all tasks belonging to v0
+    for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule 3 tasks belonging to v2
+    for (int i = 0; i < 3; i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(3)).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule 3 tasks belonging to v3
+    for (int i = 0; i < 3; i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(3)).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule remaining tasks belonging to v2
+    for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[2].getTotalTasks() - 3)).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule remaining tasks belonging to v3
+    for (int i = 3; i < vertices[3].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[3].getTotalTasks() - 3)).handle(any(Event.class));
+    reset(eventHandler);
+
+
+    // Schedule all tasks belonging to v4
+    for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[4].getTotalTasks())).handle(any(Event.class));
+    reset(eventHandler);
+  }
+
+  @Test(timeout = 5000)
+  public void testSourceRequestDelayed() {
+    // ShuffleVertexHandler - slowstart simulation
+    EventHandler eventHandler = mock(EventHandler.class);
+    DAG dag = createMockDag();
+    DAGSchedulerNaturalOrderControlled dagScheduler =
+        new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+    int numVertices = 5;
+    Vertex[] vertices = new Vertex[numVertices];
+    for (int i = 0; i < numVertices; i++) {
+      vertices[i] = dag.getVertex("vertex" + i);
+    }
+
+    // Schedule all tasks belonging to v0
+    for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
+    reset(eventHandler);
+
+    // v2 behaving as if configured with slow-start.
+    // Schedule all tasks belonging to v3.
+    for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Scheduling all tasks belonging to v4. None should get scheduled.
+    for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+    }
+    verify(eventHandler, never()).handle(any(Event.class));
+    reset(eventHandler);
+
+    // v2 now starts scheduling ...
+    // Schedule 3 tasks for v2 initially.
+    for (int i = 0; i < 3; i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(3)).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule remaining tasks belonging to v2
+    for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+    }
+    ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
+    // All of v2 and v3 should be sent out.
+    verify(eventHandler, times(vertices[2].getTotalTasks() - 3 + vertices[4].getTotalTasks()))
+        .handle(
+            args.capture());
+    int count = 0;
+    // Verify the order in which the events were sent out.
+    for (Event raw : args.getAllValues()) {
+      TaskAttemptEventSchedule event = (TaskAttemptEventSchedule) raw;
+      if (count < vertices[2].getTotalTasks() - 3) {
+        assertEquals(2, event.getTaskAttemptID().getTaskID().getVertexID().getId());
+      } else {
+        assertEquals(4, event.getTaskAttemptID().getTaskID().getVertexID().getId());
+      }
+      count++;
+    }
+    reset(eventHandler);
+  }
+
+
+  @Test(timeout = 5000)
+  public void testParallelismUpdated() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    DAG dag = createMockDag();
+    DAGSchedulerNaturalOrderControlled dagScheduler =
+        new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+    int numVertices = 5;
+    Vertex[] vertices = new Vertex[numVertices];
+    for (int i = 0; i < numVertices; i++) {
+      vertices[i] = dag.getVertex("vertex" + i);
+    }
+
+    // Schedule all tasks belonging to v0
+    for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
+    reset(eventHandler);
+
+    assertEquals(10, vertices[2].getTotalTasks());
+
+    // v2 will change parallelism
+    // Schedule all tasks belonging to v3
+    for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule all tasks belonging to v4
+    for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+    }
+    verify(eventHandler, never()).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Reset the parallelism for v2.
+    updateParallelismOnMockVertex(vertices[2], 3);
+    assertEquals(3, vertices[2].getTotalTasks());
+
+    // Schedule all tasks belonging to v2
+    for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[2].getTotalTasks() + vertices[4].getTotalTasks()))
+        .handle(any(Event.class));
+    reset(eventHandler);
+  }
+
+  @Test(timeout = 5000)
+  public void testMultipleRequestsForSameTask() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    DAG dag = createMockDag();
+    DAGSchedulerNaturalOrderControlled dagScheduler =
+        new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+    int numVertices = 5;
+    Vertex[] vertices = new Vertex[numVertices];
+    for (int i = 0; i < numVertices; i++) {
+      vertices[i] = dag.getVertex("vertex" + i);
+    }
+
+    // Schedule all but 1 task belonging to v0
+    for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+    }
+    verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
+    reset(eventHandler);
+
+
+    // Schedule all tasks belonging to v2
+    for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+    }
+    // Nothing should be scheduled
+    verify(eventHandler, never()).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule an extra attempt for all but 1 task belonging to v0
+    for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
+      dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 1));
+    }
+    // Only v0 requests should have gone out
+    verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
+    reset(eventHandler);
+
+    // Schedule last task of v0, with attempt 1
+    dagScheduler.scheduleTask(
+        createScheduleRequest(vertices[0].getVertexId(), vertices[0].getTotalTasks() - 1, 1));
+    // One v0 request and all of v2 should have gone out
+    verify(eventHandler, times(1 + vertices[2].getTotalTasks())).handle(any(Event.class));
+  }
+
+
+  // Test parallelism updated form -1
+  // Reduce parallelism
+  // Different attempts scheduled for a single task.
+
+  private DAG createMockDag() {
+    DAG dag = mock(DAG.class);
+    /*
+    v0             v1
+      \            /
+       \          /
+        v2       v3
+         \      /
+          \    /
+           \  /
+            v4
+     v0 - Root
+     v1 - Root with 0 tasks.
+     v2 - can simulate AutoReduce. Parallelism goes down. Slow schedule.
+     v3 - can simulate ImmediateStart
+     v4 - Simulate one shuffle input, one broadcast input.
+     */
+
+    int numVertices = 5;
+    Vertex[] vertices = new Vertex[numVertices];
+
+    vertices[0] = createMockVertex("vertex0", 0, 10, 0);
+    vertices[1] = createMockVertex("vertex1", 1, 0, 0);
+    vertices[2] = createMockVertex("vertex2", 2, 10, 1);
+    vertices[3] = createMockVertex("vertex3", 3, 10, 1);
+    vertices[4] = createMockVertex("vertex4", 4, 10, 2);
+
+    for (int i = 0; i < numVertices; i++) {
+      String name = vertices[i].getName();
+      TezVertexID vertexId = vertices[i].getVertexId();
+      doReturn(vertices[i]).when(dag).getVertex(name);
+      doReturn(vertices[i]).when(dag).getVertex(vertexId);
+    }
+
+
+    updateMockVertexWithConnections(vertices[0], createConnectionMap(null),
+        createConnectionMap(vertices[2]));
+    updateMockVertexWithConnections(vertices[1], createConnectionMap(null),
+        createConnectionMap(vertices[3]));
+    updateMockVertexWithConnections(vertices[2], createConnectionMap(vertices[0]),
+        createConnectionMap(vertices[4]));
+    updateMockVertexWithConnections(vertices[3], createConnectionMap(vertices[1]),
+        createConnectionMap(vertices[4]));
+    updateMockVertexWithConnections(vertices[4], createConnectionMap(vertices[2], vertices[3]),
+        createConnectionMap(null));
+
+    return dag;
+  }
+
+
+  private void updateParallelismOnMockVertex(Vertex vertex, int newParallelism) {
+    doReturn(newParallelism).when(vertex).getTotalTasks();
+  }
+
+  private Vertex createMockVertex(String name, int vertexIdInt, int totalTasks,
+                                  int distanceFromRoot) {
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexId = TezVertexID.getInstance(dagId, vertexIdInt);
+
+    Vertex vertex = mock(Vertex.class);
+    doReturn(name).when(vertex).getName();
+    doReturn(totalTasks).when(vertex).getTotalTasks();
+    doReturn(vertexId).when(vertex).getVertexId();
+    doReturn(distanceFromRoot).when(vertex).getDistanceFromRoot();
+    doReturn(vertexId + " [" + name + "]").when(vertex).getLogIdentifier();
+    return vertex;
+  }
+
+  private Map<Vertex, Edge> createConnectionMap(Vertex... vertices) {
+    Map<Vertex, Edge> map = new HashMap<Vertex, Edge>();
+    if (vertices != null) {
+      for (Vertex vertex : vertices) {
+        map.put(vertex, mock(Edge.class));
+      }
+    }
+    return map;
+  }
+
+  private void updateMockVertexWithConnections(Vertex mockVertex, Map<Vertex, Edge> sources,
+                                               Map<Vertex, Edge> destinations) {
+    doReturn(sources).when(mockVertex).getInputVertices();
+    doReturn(destinations).when(mockVertex).getOutputVertices();
+  }
+
+  private TaskAttempt createTaskAttempt(TezVertexID vertexId, int taskIdInt, int attemptIdInt) {
+    TaskAttempt taskAttempt = mock(TaskAttempt.class);
+    TezTaskID taskId = TezTaskID.getInstance(vertexId, taskIdInt);
+    TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, attemptIdInt);
+    doReturn(taskAttemptId).when(taskAttempt).getID();
+    doReturn(vertexId).when(taskAttempt).getVertexID();
+    return taskAttempt;
+  }
+
+  private DAGEventSchedulerUpdate createScheduleRequest(TezVertexID vertexId, int taskIdInt,
+                                                        int attemptIdInt) {
+    TaskAttempt mockAttempt = createTaskAttempt(vertexId, taskIdInt, attemptIdInt);
+    return new DAGEventSchedulerUpdate(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);
+  }
+
+}