You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/12 03:59:43 UTC

[1/2] tez git commit: TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks is 0 (Rajesh Balamohan and Bikas Saha) (cherry picked from commit c4487f966a81c01ed061d502a397e2cf3b4bce44)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 4d44c5854 -> 1db5d1878


TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks is 0 (Rajesh Balamohan and Bikas Saha)
(cherry picked from commit c4487f966a81c01ed061d502a397e2cf3b4bce44)

Conflicts:
	CHANGES.txt
	tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ff7930c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ff7930c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ff7930c

Branch: refs/heads/branch-0.7
Commit: 4ff7930c049421f4b46d017f73b90c3e8cde4efc
Parents: 4d44c58
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Nov 24 18:48:23 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 11 18:06:35 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../vertexmanager/ShuffleVertexManager.java     | 15 +++++++------
 .../vertexmanager/TestShuffleVertexManager.java | 23 ++++++++++++++------
 3 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04076ae..fd2749e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2956. Handle auto-reduce parallelism when the
+  totalNumBipartiteSourceTasks is 0
   TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
   instead of tasks
   TEZ-2824. Add javadocs for Vertex.setConf and DAG.setConf.

http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 308579b..9fb5d1e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -570,12 +570,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
    */
   @VisibleForTesting
   boolean determineParallelismAndApply() {
-    if(numBipartiteSourceTasksCompleted == 0) {
-      return true;
-    }
-    
     if(numVertexManagerEventsReceived == 0) {
-      return true;
+      if (totalNumBipartiteSourceTasks > 0) {
+        return true;
+      }
     }
     
     int currentParallelism = pendingTasks.size();
@@ -598,8 +596,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       return false;
     }
 
-    long expectedTotalSourceTasksOutputSize =
-        (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+    long expectedTotalSourceTasksOutputSize = 0;
+    if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) {
+      expectedTotalSourceTasksOutputSize =
+          (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+    }
 
     int desiredTaskParallelism = 
         (int)(

http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index df08060..f3f3444 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -180,7 +180,8 @@ public class TestShuffleVertexManager {
 
     doAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) throws Exception {
-          when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+          final int numTasks = ((Integer)invocation.getArguments()[0]).intValue();
+          when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks);
           newEdgeManagers.clear();
           for (Entry<String, EdgeProperty> entry :
               ((Map<String, EdgeProperty>)invocation.getArguments()[2]).entrySet()) {
@@ -210,7 +211,7 @@ public class TestShuffleVertexManager {
 
               @Override
               public int getDestinationVertexNumTasks() {
-                return 2;
+                return numTasks;
               }
             };
             EdgeManagerPlugin edgeManager = ReflectionUtils
@@ -220,7 +221,7 @@ public class TestShuffleVertexManager {
             newEdgeManagers.put(entry.getKey(), edgeManager);
           }
           return null;
-      }}).when(mockContext).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+      }}).when(mockContext).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
@@ -238,12 +239,15 @@ public class TestShuffleVertexManager {
     // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
+    verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
-    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
     scheduledTasks.clear();
     // TODO TEZ-1714 locking verify(mockContext, times(1)).vertexManagerDone(); // notified after scheduling all tasks
 
     // check scheduling only after onVertexStarted
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
     // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
@@ -254,13 +258,16 @@ public class TestShuffleVertexManager {
     // trigger start and processing of pending notification events
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
 
     
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
 
     VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
     // parallelism not change due to large data size
@@ -274,11 +281,13 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-    verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());


[2/2] tez git commit: TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start (bikas) (cherry picked from commit 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d)

Posted by bi...@apache.org.
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start (bikas)
(cherry picked from commit 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d)

Conflicts:
	CHANGES.txt
	tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
	tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1db5d187
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1db5d187
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1db5d187

Branch: refs/heads/branch-0.7
Commit: 1db5d18788a32bebc8bad3e1a125f27db7fbefc4
Parents: 4ff7930
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Dec 9 17:00:17 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 11 18:21:58 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../vertexmanager/ShuffleVertexManager.java     | 150 ++++++++------
 .../vertexmanager/TestShuffleVertexManager.java | 193 ++++++++++++-------
 3 files changed, 214 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd2749e..aced775 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
+  reduce and slow start
   TEZ-2956. Handle auto-reduce parallelism when the
   totalNumBipartiteSourceTasks is 0
   TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts

http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 9fb5d1e..01dc5a0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -154,12 +154,21 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     EdgeProperty edgeProperty;
     boolean vertexIsConfigured;
     BitSet finishedTaskSet;
+    int numTasks;
+    int numVMEventsReceived;
+    long outputSize;
 
     SourceVertexInfo(EdgeProperty edgeProperty) {
       this.edgeProperty = edgeProperty;
-      if (edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-        finishedTaskSet = new BitSet();
-      }
+      finishedTaskSet = new BitSet();
+    }
+    
+    int getNumTasks() {
+      return numTasks;
+    }
+    
+    int getNumCompletedTasks() {
+      return finishedTaskSet.cardinality();
     }
   }
 
@@ -453,6 +462,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
       srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
       // TODO what if derived class has already called this
+      // register for status update from all source vertices
       getContext().registerForVertexStateUpdates(entry.getKey(),
           EnumSet.of(VertexState.CONFIGURED));
       if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
@@ -469,7 +479,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     
     // track the tasks in this vertex
     updatePendingTasks();
-    updateSourceTaskCount();
     
     LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
              " with " + totalNumBipartiteSourceTasks + " source tasks and " +
@@ -489,19 +498,20 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
     String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
     int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
-    updateSourceTaskCount();
     SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
-
-    if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-      //handle duplicate events for bipartite sources
-      BitSet completedSourceTasks = srcInfo.finishedTaskSet;
-      if (completedSourceTasks != null) {
-        // duplicate notifications tracking
-        if (!completedSourceTasks.get(srcTaskId)) {
-          completedSourceTasks.set(srcTaskId);
-          // source task has completed
-          ++numBipartiteSourceTasksCompleted;
-        }
+    if (srcInfo.vertexIsConfigured) {
+      Preconditions.checkState(srcTaskId < srcInfo.numTasks,  
+          "Received completion for srcTaskId " + srcTaskId + " but Vertex: " + srcVertexName +
+          " has only " + srcInfo.numTasks + " tasks");
+    }
+    //handle duplicate events and count task completions from all source vertices
+    BitSet completedSourceTasks = srcInfo.finishedTaskSet;
+    // duplicate notifications tracking
+    if (!completedSourceTasks.get(srcTaskId)) {
+      completedSourceTasks.set(srcTaskId);
+      // source task has completed
+      if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+        numBipartiteSourceTasksCompleted++;
       }
     }
     schedulePendingTasks();
@@ -516,9 +526,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       LOG.info("Ignoring vertex manager event from: " + producerTask);
       return;
     }
-    
+
+    String vName = producerTask.getVertexIdentifier().getName();
+    SourceVertexInfo srcInfo = srcVertexInfo.get(vName);
+    Preconditions.checkState(srcInfo != null, "Unknown vmEvent from " + producerTask);
+
     numVertexManagerEventsReceived++;
 
+    long sourceTaskOutputSize = 0;
     if (vmEvent.getUserPayload() != null) {
       // save output size
       VertexManagerEventPayloadProto proto;
@@ -527,15 +542,21 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       } catch (InvalidProtocolBufferException e) {
         throw new TezUncheckedException(e);
       }
-      long sourceTaskOutputSize = proto.getOutputSize();
+      sourceTaskOutputSize = proto.getOutputSize();
+
+      srcInfo.numVMEventsReceived++;
+      srcInfo.outputSize += sourceTaskOutputSize;
       completedSourceTasksOutputSize += sourceTaskOutputSize;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Received info of output size: " + sourceTaskOutputSize 
-            + " numInfoReceived: " + numVertexManagerEventsReceived
-            + " total output size: " + completedSourceTasksOutputSize);
-      }
     }
     
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("For attempt: " + vmEvent.getProducerAttemptIdentifier()
+          + " received info of output size: " + sourceTaskOutputSize
+          + " vertex numEventsReceived: " + srcInfo.numVMEventsReceived
+          + " vertex output size: " + srcInfo.outputSize
+          + " total numEventsReceived: " + numVertexManagerEventsReceived
+          + " total output size: " + completedSourceTasksOutputSize);
+    }
   }
   
   void updatePendingTasks() {
@@ -554,22 +575,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     });
   }
 
-  void updateSourceTaskCount() {
-    // track source vertices
-    int numSrcTasks = 0;
-    Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo();
-    for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
-      numSrcTasks += getContext().getVertexNumTasks(entry.getKey());
-    }
-    totalNumBipartiteSourceTasks = numSrcTasks;
-  }
-
   /**
    * Compute optimal parallelism needed for the job
    * @return true (if parallelism is determined), false otherwise
    */
   @VisibleForTesting
-  boolean determineParallelismAndApply() {
+  boolean determineParallelismAndApply(float minSourceVertexCompletedTaskFraction) {
     if(numVertexManagerEventsReceived == 0) {
       if (totalNumBipartiteSourceTasks > 0) {
         return true;
@@ -585,21 +596,28 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
      */
     boolean canDetermineParallelismLater = (completedSourceTasksOutputSize <
         desiredTaskInputDataSize)
-        && (numBipartiteSourceTasksCompleted < (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+        && (minSourceVertexCompletedTaskFraction < slowStartMaxSrcCompletionFraction);
     if (canDetermineParallelismLater) {
       LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName()
           + ", totalNumBipartiteSourceTasks=" + totalNumBipartiteSourceTasks
           + ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize
           + ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived
-          + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted + ", maxThreshold="
-          + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+          + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted
+          + ", minSourceVertexCompletedTaskFraction=" + minSourceVertexCompletedTaskFraction);
       return false;
     }
 
+    // Change this to use per partition stats for more accuracy TEZ-2962.
+    // Instead of aggregating overall size and then dividing equally - coalesce partitions until 
+    // desired per partition size is achieved.
     long expectedTotalSourceTasksOutputSize = 0;
-    if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) {
-      expectedTotalSourceTasksOutputSize =
-          (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+    for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+      SourceVertexInfo srcInfo = vInfo.getValue();
+      if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
+        // this assumes that 1 vmEvent is received per completed task - TEZ-2961
+        expectedTotalSourceTasksOutputSize += 
+            (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived;
+      }
     }
 
     int desiredTaskParallelism = 
@@ -667,8 +685,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     return true;
   }
-  
-  void schedulePendingTasks(int numTasksToSchedule) {
+
+  void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) {
     // determine parallelism before scheduling the first time
     // this is the latest we can wait before determining parallelism.
     // currently this depends on task completion and so this is the best time
@@ -677,7 +695,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     // calculating parallelism or change parallelism while tasks are already
     // running then we can create other parameters to trigger this calculation.
     if(enableAutoParallelism && !parallelismDetermined) {
-      parallelismDetermined = determineParallelismAndApply();
+      parallelismDetermined = determineParallelismAndApply(minSourceVertexCompletedTaskFraction);
       if (!parallelismDetermined) {
         //try to determine parallelism later when more info is available.
         return;
@@ -707,10 +725,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
    */
   boolean canScheduleTasks() {
     for(Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
-      String sourceVertex = entry.getKey();
-      int numSourceTasks = getContext().getVertexNumTasks(sourceVertex);
-      if (numSourceTasks > 0 && !entry.getValue().vertexIsConfigured) {
-        // vertex not configured
+      // need to check for vertex configured because until that we dont know if numTasks==0 is valid
+      if (!entry.getValue().vertexIsConfigured) { // isConfigured
+        // vertex not scheduled tasks
         if (LOG.isDebugEnabled()) {
           LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: "
               + getContext().getVertexName());
@@ -744,29 +761,38 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       LOG.info("All source tasks assigned. " +
           "Ramping up " + numPendingTasks + 
           " remaining tasks for vertex: " + getContext().getVertexName());
-      schedulePendingTasks(numPendingTasks);
+      schedulePendingTasks(numPendingTasks, 1);
       return;
     }
 
-    float completedSourceTaskFraction = 0f;
-    if (totalNumBipartiteSourceTasks != 0) { // support for 0 source tasks
-      completedSourceTaskFraction = (float) numBipartiteSourceTasksCompleted / totalNumBipartiteSourceTasks;
-    } else {
-      completedSourceTaskFraction = 1;
+    float minSourceVertexCompletedTaskFraction = 1f;
+    String minCompletedVertexName = "";
+    for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+      SourceVertexInfo srcInfo = vInfo.getValue();
+      // canScheduleTasks check has already verified all sources are configured
+      Preconditions.checkState(srcInfo.vertexIsConfigured, "Vertex: " + vInfo.getKey());
+      if (srcInfo.numTasks > 0) {
+        int numCompletedTasks = srcInfo.getNumCompletedTasks();
+        float completedFraction = (float) numCompletedTasks / srcInfo.numTasks;
+        if (minSourceVertexCompletedTaskFraction > completedFraction) {
+          minSourceVertexCompletedTaskFraction = completedFraction;
+          minCompletedVertexName = vInfo.getKey();
+        }
+      }
     }
 
     // start scheduling when source tasks completed fraction is more than min.
     // linearly increase the number of scheduled tasks such that all tasks are 
     // scheduled when source tasks completed fraction reaches max
-    float tasksFractionToSchedule = 1; 
+    float tasksFractionToSchedule = 1;
     float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction;
     if (percentRange > 0) {
       tasksFractionToSchedule = 
-            (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
+            (minSourceVertexCompletedTaskFraction - slowStartMinSrcCompletionFraction)/
             percentRange;
     } else {
       // min and max are equal. schedule 100% on reaching min
-      if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) {
+      if(minSourceVertexCompletedTaskFraction < slowStartMinSrcCompletionFraction) {
         tasksFractionToSchedule = 0;
       }
     }
@@ -784,10 +810,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
                getContext().getVertexName() + " with totalTasks: " +
                totalTasksToSchedule + ". " + numBipartiteSourceTasksCompleted +
                " source tasks completed out of " + totalNumBipartiteSourceTasks +
-               ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + 
+               ". MinSourceTaskCompletedFraction: " + minSourceVertexCompletedTaskFraction +
+               " in Vertex: " + minCompletedVertexName +
                " min: " + slowStartMinSrcCompletionFraction + 
                " max: " + slowStartMaxSrcCompletionFraction);
-      schedulePendingTasks(numTasksToSchedule);
+      schedulePendingTasks(numTasksToSchedule, minSourceVertexCompletedTaskFraction);
     }
   }
 
@@ -857,8 +884,13 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     SourceVertexInfo vInfo = srcVertexInfo.get(stateUpdate.getVertexName()); 
     Preconditions.checkState(vInfo.vertexIsConfigured == false);
     vInfo.vertexIsConfigured = true;
+    vInfo.numTasks = getContext().getVertexNumTasks(stateUpdate.getVertexName());
+    if (vInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+      totalNumBipartiteSourceTasks += vInfo.numTasks;
+    }
     LOG.info("Received configured notification : " + stateUpdate.getVertexState() + " for vertex: "
-      + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+      + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName() + 
+      " numBipartiteSourceTasks: " + totalNumBipartiteSourceTasks);
     schedulePendingTasks();
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index f3f3444..9a9ff27 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -236,10 +236,12 @@ public class TestShuffleVertexManager {
     
     // check waiting for notification before scheduling
     Assert.assertFalse(manager.pendingTasks.isEmpty());
-    // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
+    // source vertices have 0 tasks. triggers scheduling
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
-    verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+    verify(mockContext, times(1)).reconfigureVertex(eq(1), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
@@ -252,13 +254,15 @@ public class TestShuffleVertexManager {
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
     // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
     // normally this event will not come before onVertexStarted() is called
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start
     Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
     // trigger start and processing of pending notification events
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
-    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+    verify(mockContext, times(2)).reconfigureVertex(eq(1), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -269,30 +273,31 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
 
-    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, mockSrcVertexId1);
     // parallelism not change due to large data size
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
     verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     manager.onVertexManagerEventReceived(vmEvent);
 
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());
     // TODO TEZ-1714 locking verify(mockContext, times(2)).vertexManagerDone(); // notified after scheduling all tasks
-    Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
 
     /**
@@ -304,41 +309,42 @@ public class TestShuffleVertexManager {
     //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
     manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //First task in src1 completed with small payload
-    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1);
     manager.onVertexManagerEventReceived(vmEvent); //small payload
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-    Assert.assertTrue(manager.determineParallelismAndApply() == false);
+    Assert.assertTrue(manager.determineParallelismAndApply(0f) == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
 
-    //Second task in src1 completed with small payload
-    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+    //First task in src2 completed with small payload
+    vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2);
     manager.onVertexManagerEventReceived(vmEvent); //small payload
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
-    Assert.assertTrue(manager.determineParallelismAndApply() == false);
+    Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
-    Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
 
     //First task in src2 completed (with larger payload) to trigger determining parallelism
-    vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2);
     manager.onVertexManagerEventReceived(vmEvent);
-    Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
+    Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure parallelism is determined
+    verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertEquals(1, manager.pendingTasks.size());
     Assert.assertEquals(1, scheduledTasks.size());
@@ -351,10 +357,11 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
     scheduledTasks.clear();
-    vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
 
     //min/max fraction of 0.0/0.2
     manager = createManager(conf, mockContext, 0.0f, 0.2f);
+    // initial invocation count == 3
+    verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
@@ -363,21 +370,25 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
     //send 7 events with payload size as 100
-    for(int i=0;i<7;i++) {
-      manager.onVertexManagerEventReceived(vmEvent); //small payload
+    for(int i=0;i<8;i++) {
+      //small payload - create new event each time or it will be ignored (from same task)
+      manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L, mockSrcVertexId1));
       manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
       //should not change parallelism
-      verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
+      verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    }
+    for(int i=0;i<3;i++) {
+      manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i));
+      verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     }
-    //send 8th event with payload size as 100
-    manager.onVertexManagerEventReceived(vmEvent);
-
     //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
     //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
-    verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
+    // parallelism updated
+    verify(mockContext, times(4)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    // check exact update value - 8 events with 100 each => 20 -> 2000 => 2 tasks (with 1000 per task)
+    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
 
     //reset context for next test
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
@@ -399,7 +410,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1);
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertEquals(4, manager.pendingTasks.size());
@@ -413,14 +424,15 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
-    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2);
     manager.onVertexManagerEventReceived(vmEvent);
     //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
 
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     // managedVertex tasks reduced
-    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     // TODO improve tests for parallelism
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -433,7 +445,7 @@ public class TestShuffleVertexManager {
     
     // more completions dont cause recalculation of parallelism
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
-    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
     EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -502,7 +514,8 @@ public class TestShuffleVertexManager {
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
-    
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
+
     // fail if there is no bipartite src vertex
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     try {
@@ -537,6 +550,9 @@ public class TestShuffleVertexManager {
     // source vertices have 0 tasks. immediate start of all managed tasks
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -572,8 +588,9 @@ public class TestShuffleVertexManager {
     }
 
     // source vertex have some tasks. min > default and max undefined
-    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
-    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+    int numTasks = 20;
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks);
     scheduledTasks.clear();
 
     manager = createManager(conf, mockContext, 0.8f, null);
@@ -581,18 +598,17 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+
     Assert.assertEquals(3, manager.pendingTasks.size());
-    Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
-    float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks;
-    int completedTasks = 0;
+    float completedTasksThreshold = 0.8f * numTasks;
     // Finish all tasks before exceeding the threshold
     for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
       for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
-        manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
-        ++completedTasks;
-        if ((completedTasks + 1) >= completedTasksThreshold) {
+        // complete 0th tasks outside the loop
+        manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1));
+        if ((i + 2) >= completedTasksThreshold) {
           // stop before completing more than min/max source tasks
           break;
         }
@@ -603,7 +619,10 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
 
     // Cross the threshold min/max threshold to schedule all tasks
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    Assert.assertEquals(3, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size());
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertEquals(0, manager.pendingTasks.size());
     Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
 
@@ -614,13 +633,13 @@ public class TestShuffleVertexManager {
     // source vertex have some tasks. min, max == 0
     manager = createManager(conf, mockContext, 0.f, 0.f);
     manager.onVertexStarted(emptyCompletions);
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
     // all source vertices need to be configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     
@@ -637,10 +656,15 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
+    // task completion on only 1 SG edge does nothing
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     
     // min, max > 0 and min == max == absolute max 1.0
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
@@ -696,6 +720,10 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     
+    // reset vertices for next test
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4);
+
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 0.75f);
     manager.onVertexStarted(emptyCompletions);
@@ -703,26 +731,30 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
-    Assert.assertTrue(manager.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     // completion of same task again should not get counted
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
-    Assert.assertTrue(manager.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 2);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
     scheduledTasks.clear();
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
 
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 1.0f);
@@ -731,20 +763,24 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
     Assert.assertTrue(manager.pendingTasks.size() == 1);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
 
   }
 
@@ -798,7 +834,7 @@ public class TestShuffleVertexManager {
     when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
     when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
 
-    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
     // check initialization
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
 
@@ -821,11 +857,11 @@ public class TestShuffleVertexManager {
 
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(9, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
 
     //Send events for all tasks of m3.
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
@@ -833,23 +869,34 @@ public class TestShuffleVertexManager {
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
 
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
 
-    //Send an event for m2. But still we need to wait for at least 1 event from r1.
+    //Send events for m2. But still we need to wait for at least 1 event from r1.
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
+
+    // we need to wait for at least 1 event from r1 to make sure all vertices cross min threshold
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
 
     //Ensure that setVertexParallelism is not called for R2.
     verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
         anyMap());
+
+    //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
+    when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1);
+
     // complete configuration of r1 triggers the scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
     verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class),
         anyMap());
   
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
-    Assert.assertTrue(scheduledTasks.size() == 3);
+    Assert.assertTrue(scheduledTasks.size() == 1);
 
     //try with zero task vertices
     scheduledTasks.clear();
@@ -863,14 +910,13 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
     manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
-    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
     // Only need completed configuration notification from m3
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
@@ -989,7 +1035,7 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
-    //Send an event for m2.
+    //Send an event for m3.
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1047,8 +1093,9 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 0);
 
-    // event from m3 triggers scheduling. no need for m2 since it has 0 tasks
+    // event from m3 triggers scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
 
@@ -1065,6 +1112,8 @@ public class TestShuffleVertexManager {
 
     //Send 1 events for tasks of r1.
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);