You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/12 03:59:43 UTC
[1/2] tez git commit: TEZ-2956. Handle auto-reduce parallelism when
the totalNumBipartiteSourceTasks is 0 (Rajesh Balamohan and Bikas Saha)
(cherry picked from commit c4487f966a81c01ed061d502a397e2cf3b4bce44)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 4d44c5854 -> 1db5d1878
TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks is 0 (Rajesh Balamohan and Bikas Saha)
(cherry picked from commit c4487f966a81c01ed061d502a397e2cf3b4bce44)
Conflicts:
CHANGES.txt
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ff7930c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ff7930c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ff7930c
Branch: refs/heads/branch-0.7
Commit: 4ff7930c049421f4b46d017f73b90c3e8cde4efc
Parents: 4d44c58
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Nov 24 18:48:23 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 11 18:06:35 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../vertexmanager/ShuffleVertexManager.java | 15 +++++++------
.../vertexmanager/TestShuffleVertexManager.java | 23 ++++++++++++++------
3 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04076ae..fd2749e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2956. Handle auto-reduce parallelism when the
+ totalNumBipartiteSourceTasks is 0
TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
instead of tasks
TEZ-2824. Add javadocs for Vertex.setConf and DAG.setConf.
http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 308579b..9fb5d1e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -570,12 +570,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
*/
@VisibleForTesting
boolean determineParallelismAndApply() {
- if(numBipartiteSourceTasksCompleted == 0) {
- return true;
- }
-
if(numVertexManagerEventsReceived == 0) {
- return true;
+ if (totalNumBipartiteSourceTasks > 0) {
+ return true;
+ }
}
int currentParallelism = pendingTasks.size();
@@ -598,8 +596,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
return false;
}
- long expectedTotalSourceTasksOutputSize =
- (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+ long expectedTotalSourceTasksOutputSize = 0;
+ if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) {
+ expectedTotalSourceTasksOutputSize =
+ (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+ }
int desiredTaskParallelism =
(int)(
http://git-wip-us.apache.org/repos/asf/tez/blob/4ff7930c/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index df08060..f3f3444 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -180,7 +180,8 @@ public class TestShuffleVertexManager {
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) throws Exception {
- when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+ final int numTasks = ((Integer)invocation.getArguments()[0]).intValue();
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks);
newEdgeManagers.clear();
for (Entry<String, EdgeProperty> entry :
((Map<String, EdgeProperty>)invocation.getArguments()[2]).entrySet()) {
@@ -210,7 +211,7 @@ public class TestShuffleVertexManager {
@Override
public int getDestinationVertexNumTasks() {
- return 2;
+ return numTasks;
}
};
EdgeManagerPlugin edgeManager = ReflectionUtils
@@ -220,7 +221,7 @@ public class TestShuffleVertexManager {
newEdgeManagers.put(entry.getKey(), edgeManager);
}
return null;
- }}).when(mockContext).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+ }}).when(mockContext).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
// check initialization
manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
@@ -238,12 +239,15 @@ public class TestShuffleVertexManager {
// source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.isEmpty());
+ verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
- Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+ Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
scheduledTasks.clear();
// TODO TEZ-1714 locking verify(mockContext, times(1)).vertexManagerDone(); // notified after scheduling all tasks
// check scheduling only after onVertexStarted
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
verify(mockContext, times(3)).vertexReconfigurationPlanned();
// source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
@@ -254,13 +258,16 @@ public class TestShuffleVertexManager {
// trigger start and processing of pending notification events
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 2);
+ verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(manager.pendingTasks.isEmpty());
- Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+ Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
// parallelism not change due to large data size
@@ -274,11 +281,13 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex();
// trigger scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
- verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ (VertexLocationHint.class), anyMap());
verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(4, scheduledTasks.size());
[2/2] tez git commit: TEZ-2943. Change shuffle vertex manager to use
per vertex data for auto reduce and slow start (bikas) (cherry picked from
commit 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d)
Posted by bi...@apache.org.
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start (bikas)
(cherry picked from commit 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d)
Conflicts:
CHANGES.txt
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1db5d187
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1db5d187
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1db5d187
Branch: refs/heads/branch-0.7
Commit: 1db5d18788a32bebc8bad3e1a125f27db7fbefc4
Parents: 4ff7930
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Dec 9 17:00:17 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 11 18:21:58 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../vertexmanager/ShuffleVertexManager.java | 150 ++++++++------
.../vertexmanager/TestShuffleVertexManager.java | 193 ++++++++++++-------
3 files changed, 214 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd2749e..aced775 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
+ reduce and slow start
TEZ-2956. Handle auto-reduce parallelism when the
totalNumBipartiteSourceTasks is 0
TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 9fb5d1e..01dc5a0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -154,12 +154,21 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
EdgeProperty edgeProperty;
boolean vertexIsConfigured;
BitSet finishedTaskSet;
+ int numTasks;
+ int numVMEventsReceived;
+ long outputSize;
SourceVertexInfo(EdgeProperty edgeProperty) {
this.edgeProperty = edgeProperty;
- if (edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
- finishedTaskSet = new BitSet();
- }
+ finishedTaskSet = new BitSet();
+ }
+
+ int getNumTasks() {
+ return numTasks;
+ }
+
+ int getNumCompletedTasks() {
+ return finishedTaskSet.cardinality();
}
}
@@ -453,6 +462,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
// TODO what if derived class has already called this
+ // register for status update from all source vertices
getContext().registerForVertexStateUpdates(entry.getKey(),
EnumSet.of(VertexState.CONFIGURED));
if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
@@ -469,7 +479,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
// track the tasks in this vertex
updatePendingTasks();
- updateSourceTaskCount();
LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
" with " + totalNumBipartiteSourceTasks + " source tasks and " +
@@ -489,19 +498,20 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
- updateSourceTaskCount();
SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
-
- if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
- //handle duplicate events for bipartite sources
- BitSet completedSourceTasks = srcInfo.finishedTaskSet;
- if (completedSourceTasks != null) {
- // duplicate notifications tracking
- if (!completedSourceTasks.get(srcTaskId)) {
- completedSourceTasks.set(srcTaskId);
- // source task has completed
- ++numBipartiteSourceTasksCompleted;
- }
+ if (srcInfo.vertexIsConfigured) {
+ Preconditions.checkState(srcTaskId < srcInfo.numTasks,
+ "Received completion for srcTaskId " + srcTaskId + " but Vertex: " + srcVertexName +
+ " has only " + srcInfo.numTasks + " tasks");
+ }
+ //handle duplicate events and count task completions from all source vertices
+ BitSet completedSourceTasks = srcInfo.finishedTaskSet;
+ // duplicate notifications tracking
+ if (!completedSourceTasks.get(srcTaskId)) {
+ completedSourceTasks.set(srcTaskId);
+ // source task has completed
+ if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ numBipartiteSourceTasksCompleted++;
}
}
schedulePendingTasks();
@@ -516,9 +526,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
LOG.info("Ignoring vertex manager event from: " + producerTask);
return;
}
-
+
+ String vName = producerTask.getVertexIdentifier().getName();
+ SourceVertexInfo srcInfo = srcVertexInfo.get(vName);
+ Preconditions.checkState(srcInfo != null, "Unknown vmEvent from " + producerTask);
+
numVertexManagerEventsReceived++;
+ long sourceTaskOutputSize = 0;
if (vmEvent.getUserPayload() != null) {
// save output size
VertexManagerEventPayloadProto proto;
@@ -527,15 +542,21 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
} catch (InvalidProtocolBufferException e) {
throw new TezUncheckedException(e);
}
- long sourceTaskOutputSize = proto.getOutputSize();
+ sourceTaskOutputSize = proto.getOutputSize();
+
+ srcInfo.numVMEventsReceived++;
+ srcInfo.outputSize += sourceTaskOutputSize;
completedSourceTasksOutputSize += sourceTaskOutputSize;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received info of output size: " + sourceTaskOutputSize
- + " numInfoReceived: " + numVertexManagerEventsReceived
- + " total output size: " + completedSourceTasksOutputSize);
- }
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("For attempt: " + vmEvent.getProducerAttemptIdentifier()
+ + " received info of output size: " + sourceTaskOutputSize
+ + " vertex numEventsReceived: " + srcInfo.numVMEventsReceived
+ + " vertex output size: " + srcInfo.outputSize
+ + " total numEventsReceived: " + numVertexManagerEventsReceived
+ + " total output size: " + completedSourceTasksOutputSize);
+ }
}
void updatePendingTasks() {
@@ -554,22 +575,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
});
}
- void updateSourceTaskCount() {
- // track source vertices
- int numSrcTasks = 0;
- Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo();
- for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
- numSrcTasks += getContext().getVertexNumTasks(entry.getKey());
- }
- totalNumBipartiteSourceTasks = numSrcTasks;
- }
-
/**
* Compute optimal parallelism needed for the job
* @return true (if parallelism is determined), false otherwise
*/
@VisibleForTesting
- boolean determineParallelismAndApply() {
+ boolean determineParallelismAndApply(float minSourceVertexCompletedTaskFraction) {
if(numVertexManagerEventsReceived == 0) {
if (totalNumBipartiteSourceTasks > 0) {
return true;
@@ -585,21 +596,28 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
*/
boolean canDetermineParallelismLater = (completedSourceTasksOutputSize <
desiredTaskInputDataSize)
- && (numBipartiteSourceTasksCompleted < (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+ && (minSourceVertexCompletedTaskFraction < slowStartMaxSrcCompletionFraction);
if (canDetermineParallelismLater) {
LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName()
+ ", totalNumBipartiteSourceTasks=" + totalNumBipartiteSourceTasks
+ ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize
+ ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived
- + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted + ", maxThreshold="
- + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+ + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted
+ + ", minSourceVertexCompletedTaskFraction=" + minSourceVertexCompletedTaskFraction);
return false;
}
+ // Change this to use per partition stats for more accuracy TEZ-2962.
+ // Instead of aggregating overall size and then dividing equally - coalesce partitions until
+ // desired per partition size is achieved.
long expectedTotalSourceTasksOutputSize = 0;
- if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) {
- expectedTotalSourceTasksOutputSize =
- (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+ for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+ SourceVertexInfo srcInfo = vInfo.getValue();
+ if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
+ // this assumes that 1 vmEvent is received per completed task - TEZ-2961
+ expectedTotalSourceTasksOutputSize +=
+ (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived;
+ }
}
int desiredTaskParallelism =
@@ -667,8 +685,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
return true;
}
-
- void schedulePendingTasks(int numTasksToSchedule) {
+
+ void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) {
// determine parallelism before scheduling the first time
// this is the latest we can wait before determining parallelism.
// currently this depends on task completion and so this is the best time
@@ -677,7 +695,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
// calculating parallelism or change parallelism while tasks are already
// running then we can create other parameters to trigger this calculation.
if(enableAutoParallelism && !parallelismDetermined) {
- parallelismDetermined = determineParallelismAndApply();
+ parallelismDetermined = determineParallelismAndApply(minSourceVertexCompletedTaskFraction);
if (!parallelismDetermined) {
//try to determine parallelism later when more info is available.
return;
@@ -707,10 +725,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
*/
boolean canScheduleTasks() {
for(Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
- String sourceVertex = entry.getKey();
- int numSourceTasks = getContext().getVertexNumTasks(sourceVertex);
- if (numSourceTasks > 0 && !entry.getValue().vertexIsConfigured) {
- // vertex not configured
+ // need to check for vertex configured because until that we dont know if numTasks==0 is valid
+ if (!entry.getValue().vertexIsConfigured) { // isConfigured
+ // vertex not scheduled tasks
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: "
+ getContext().getVertexName());
@@ -744,29 +761,38 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
LOG.info("All source tasks assigned. " +
"Ramping up " + numPendingTasks +
" remaining tasks for vertex: " + getContext().getVertexName());
- schedulePendingTasks(numPendingTasks);
+ schedulePendingTasks(numPendingTasks, 1);
return;
}
- float completedSourceTaskFraction = 0f;
- if (totalNumBipartiteSourceTasks != 0) { // support for 0 source tasks
- completedSourceTaskFraction = (float) numBipartiteSourceTasksCompleted / totalNumBipartiteSourceTasks;
- } else {
- completedSourceTaskFraction = 1;
+ float minSourceVertexCompletedTaskFraction = 1f;
+ String minCompletedVertexName = "";
+ for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+ SourceVertexInfo srcInfo = vInfo.getValue();
+ // canScheduleTasks check has already verified all sources are configured
+ Preconditions.checkState(srcInfo.vertexIsConfigured, "Vertex: " + vInfo.getKey());
+ if (srcInfo.numTasks > 0) {
+ int numCompletedTasks = srcInfo.getNumCompletedTasks();
+ float completedFraction = (float) numCompletedTasks / srcInfo.numTasks;
+ if (minSourceVertexCompletedTaskFraction > completedFraction) {
+ minSourceVertexCompletedTaskFraction = completedFraction;
+ minCompletedVertexName = vInfo.getKey();
+ }
+ }
}
// start scheduling when source tasks completed fraction is more than min.
// linearly increase the number of scheduled tasks such that all tasks are
// scheduled when source tasks completed fraction reaches max
- float tasksFractionToSchedule = 1;
+ float tasksFractionToSchedule = 1;
float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction;
if (percentRange > 0) {
tasksFractionToSchedule =
- (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
+ (minSourceVertexCompletedTaskFraction - slowStartMinSrcCompletionFraction)/
percentRange;
} else {
// min and max are equal. schedule 100% on reaching min
- if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) {
+ if(minSourceVertexCompletedTaskFraction < slowStartMinSrcCompletionFraction) {
tasksFractionToSchedule = 0;
}
}
@@ -784,10 +810,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
getContext().getVertexName() + " with totalTasks: " +
totalTasksToSchedule + ". " + numBipartiteSourceTasksCompleted +
" source tasks completed out of " + totalNumBipartiteSourceTasks +
- ". SourceTaskCompletedFraction: " + completedSourceTaskFraction +
+ ". MinSourceTaskCompletedFraction: " + minSourceVertexCompletedTaskFraction +
+ " in Vertex: " + minCompletedVertexName +
" min: " + slowStartMinSrcCompletionFraction +
" max: " + slowStartMaxSrcCompletionFraction);
- schedulePendingTasks(numTasksToSchedule);
+ schedulePendingTasks(numTasksToSchedule, minSourceVertexCompletedTaskFraction);
}
}
@@ -857,8 +884,13 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
SourceVertexInfo vInfo = srcVertexInfo.get(stateUpdate.getVertexName());
Preconditions.checkState(vInfo.vertexIsConfigured == false);
vInfo.vertexIsConfigured = true;
+ vInfo.numTasks = getContext().getVertexNumTasks(stateUpdate.getVertexName());
+ if (vInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ totalNumBipartiteSourceTasks += vInfo.numTasks;
+ }
LOG.info("Received configured notification : " + stateUpdate.getVertexState() + " for vertex: "
- + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+ + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName() +
+ " numBipartiteSourceTasks: " + totalNumBipartiteSourceTasks);
schedulePendingTasks();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index f3f3444..9a9ff27 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -236,10 +236,12 @@ public class TestShuffleVertexManager {
// check waiting for notification before scheduling
Assert.assertFalse(manager.pendingTasks.isEmpty());
- // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
+ // source vertices have 0 tasks. triggers scheduling
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.isEmpty());
- verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(1)).reconfigureVertex(eq(1), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
@@ -252,13 +254,15 @@ public class TestShuffleVertexManager {
verify(mockContext, times(3)).vertexReconfigurationPlanned();
// source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
// normally this event will not come before onVertexStarted() is called
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start
Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
// trigger start and processing of pending notification events
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 2);
- verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(2)).reconfigureVertex(eq(1), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -269,30 +273,31 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
- VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, mockSrcVertexId1);
// parallelism not change due to large data size
manager = createManager(conf, mockContext, 0.1f, 0.1f);
verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
manager.onVertexManagerEventReceived(vmEvent);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex();
// trigger scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(4, scheduledTasks.size());
// TODO TEZ-1714 locking verify(mockContext, times(2)).vertexManagerDone(); // notified after scheduling all tasks
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
/**
@@ -304,41 +309,42 @@ public class TestShuffleVertexManager {
//min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
manager = createManager(conf, mockContext, 0.01f, 0.75f);
manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//First task in src1 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1);
manager.onVertexManagerEventReceived(vmEvent); //small payload
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertTrue(manager.determineParallelismAndApply(0f) == false);
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
- //Second task in src1 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+ //First task in src2 completed with small payload
+ vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent); //small payload
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
//Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
- Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false);
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
//First task in src2 completed (with larger payload) to trigger determining parallelism
- vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
+ Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure parallelism is determined
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertEquals(1, manager.pendingTasks.size());
Assert.assertEquals(1, scheduledTasks.size());
@@ -351,10 +357,11 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
scheduledTasks.clear();
- vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
//min/max fraction of 0.0/0.2
manager = createManager(conf, mockContext, 0.0f, 0.2f);
+ // initial invocation count == 3
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
@@ -363,21 +370,25 @@ public class TestShuffleVertexManager {
Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//send 7 events with payload size as 100
- for(int i=0;i<7;i++) {
- manager.onVertexManagerEventReceived(vmEvent); //small payload
+ for(int i=0;i<8;i++) {
+ //small payload - create new event each time or it will be ignored (from same task)
+ manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L, mockSrcVertexId1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
//should not change parallelism
- verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ }
+ for(int i=0;i<3;i++) {
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i));
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
}
- //send 8th event with payload size as 100
- manager.onVertexManagerEventReceived(vmEvent);
-
//ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
//Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
- verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
+ // parallelism updated
+ verify(mockContext, times(4)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ // check exact update value - 8 events with 100 each => 20 -> 2000 => 2 tasks (with 1000 per task)
+ verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
//reset context for next test
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
@@ -399,7 +410,7 @@ public class TestShuffleVertexManager {
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1);
manager.onVertexManagerEventReceived(vmEvent);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertEquals(4, manager.pendingTasks.size());
@@ -413,14 +424,15 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
- vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent);
//ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
// managedVertex tasks reduced
- verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
// TODO improve tests for parallelism
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -433,7 +445,7 @@ public class TestShuffleVertexManager {
// more completions dont cause recalculation of parallelism
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -502,7 +514,8 @@ public class TestShuffleVertexManager {
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
-
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
+
// fail if there is no bipartite src vertex
mockInputVertices.put(mockSrcVertexId3, eProp3);
try {
@@ -537,6 +550,9 @@ public class TestShuffleVertexManager {
// source vertices have 0 tasks. immediate start of all managed tasks
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -572,8 +588,9 @@ public class TestShuffleVertexManager {
}
// source vertex have some tasks. min > default and max undefined
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+ int numTasks = 20;
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks);
scheduledTasks.clear();
manager = createManager(conf, mockContext, 0.8f, null);
@@ -581,18 +598,17 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+
Assert.assertEquals(3, manager.pendingTasks.size());
- Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks;
- int completedTasks = 0;
+ float completedTasksThreshold = 0.8f * numTasks;
// Finish all tasks before exceeding the threshold
for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
- ++completedTasks;
- if ((completedTasks + 1) >= completedTasksThreshold) {
+ // complete 0th tasks outside the loop
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1));
+ if ((i + 2) >= completedTasksThreshold) {
// stop before completing more than min/max source tasks
break;
}
@@ -603,7 +619,10 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
// Cross the threshold min/max threshold to schedule all tasks
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ Assert.assertEquals(3, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size());
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertEquals(0, manager.pendingTasks.size());
Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
@@ -614,13 +633,13 @@ public class TestShuffleVertexManager {
// source vertex have some tasks. min, max == 0
manager = createManager(conf, mockContext, 0.f, 0.f);
manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.totalTasksToSchedule == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
// all source vertices need to be configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -637,10 +656,15 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
+ // task completion on only 1 SG edge does nothing
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
// min, max > 0 and min == max == absolute max 1.0
manager = createManager(conf, mockContext, 1.0f, 1.0f);
@@ -696,6 +720,10 @@ public class TestShuffleVertexManager {
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ // reset vertices for next test
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4);
+
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 0.75f);
manager.onVertexStarted(emptyCompletions);
@@ -703,26 +731,30 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
// completion of same task again should not get counted
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 2);
+ Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
scheduledTasks.clear();
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 1.0f);
@@ -731,20 +763,24 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
Assert.assertTrue(manager.pendingTasks.size() == 1);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
}
@@ -798,7 +834,7 @@ public class TestShuffleVertexManager {
when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
- VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
// check initialization
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
@@ -821,11 +857,11 @@ public class TestShuffleVertexManager {
manager.onVertexManagerEventReceived(vmEvent);
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(9, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
//Send events for all tasks of m3.
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
@@ -833,23 +869,34 @@ public class TestShuffleVertexManager {
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
- //Send an event for m2. But still we need to wait for at least 1 event from r1.
+ //Send events for m2. But still we need to wait for at least 1 event from r1.
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
+
+ // we need to wait for at least 1 event from r1 to make sure all vertices cross min threshold
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
//Ensure that setVertexParallelism is not called for R2.
verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
+
+ //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
+ when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1);
+
// complete configuration of r1 triggers the scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class),
anyMap());
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 3);
+ Assert.assertTrue(scheduledTasks.size() == 1);
//try with zero task vertices
scheduledTasks.clear();
@@ -863,14 +910,13 @@ public class TestShuffleVertexManager {
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
manager.onVertexStarted(emptyCompletions);
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
// Only need completed configuration notification from m3
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+ Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -989,7 +1035,7 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
- //Send an event for m2.
+ //Send an event for m3.
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1047,8 +1093,9 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 0);
- // event from m3 triggers scheduling. no need for m2 since it has 0 tasks
+ // event from m3 triggers scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1065,6 +1112,8 @@ public class TestShuffleVertexManager {
//Send 1 events for tasks of r1.
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);