You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/10 02:00:43 UTC
tez git commit: TEZ-2943. Change shuffle vertex manager to use per
vertex data for auto reduce and slow start (bikas)
Repository: tez
Updated Branches:
refs/heads/master 5af06047f -> 0dd68ad1c
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0dd68ad1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0dd68ad1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0dd68ad1
Branch: refs/heads/master
Commit: 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d
Parents: 5af0604
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Dec 9 17:00:17 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Dec 9 17:00:17 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../vertexmanager/ShuffleVertexManager.java | 142 +++++++------
.../vertexmanager/TestShuffleVertexManager.java | 199 +++++++++++--------
3 files changed, 209 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9031566..cfa93bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES:
+ TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
+ reduce and slow start
TEZ-2346. TEZ-UI: Lazy load other info / counter data
TEZ-2975. Bump up apache commons dependency.
TEZ-2970. Re-localization in TezChild does not use correct UGI.
http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index f10c89a..c88c7a2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -168,12 +168,21 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
EdgeProperty edgeProperty;
boolean vertexIsConfigured;
BitSet finishedTaskSet;
+ int numTasks;
+ int numVMEventsReceived;
+ long outputSize;
SourceVertexInfo(EdgeProperty edgeProperty) {
this.edgeProperty = edgeProperty;
- if (edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
- finishedTaskSet = new BitSet();
- }
+ finishedTaskSet = new BitSet();
+ }
+
+ int getNumTasks() {
+ return numTasks;
+ }
+
+ int getNumCompletedTasks() {
+ return finishedTaskSet.cardinality();
}
}
@@ -482,6 +491,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
// TODO what if derived class has already called this
+ // register for status update from all source vertices
getContext().registerForVertexStateUpdates(entry.getKey(),
EnumSet.of(VertexState.CONFIGURED));
if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
@@ -498,7 +508,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
// track the tasks in this vertex
updatePendingTasks();
- updateSourceTaskCount();
LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
" with " + totalNumBipartiteSourceTasks + " source tasks and " +
@@ -519,19 +528,20 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
- updateSourceTaskCount();
SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
-
- if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
- //handle duplicate events for bipartite sources
- BitSet completedSourceTasks = srcInfo.finishedTaskSet;
- if (completedSourceTasks != null) {
- // duplicate notifications tracking
- if (!completedSourceTasks.get(srcTaskId)) {
- completedSourceTasks.set(srcTaskId);
- // source task has completed
- ++numBipartiteSourceTasksCompleted;
- }
+ if (srcInfo.vertexIsConfigured) {
+ Preconditions.checkState(srcTaskId < srcInfo.numTasks,
+ "Received completion for srcTaskId " + srcTaskId + " but Vertex: " + srcVertexName +
+ " has only " + srcInfo.numTasks + " tasks");
+ }
+ //handle duplicate events and count task completions from all source vertices
+ BitSet completedSourceTasks = srcInfo.finishedTaskSet;
+ // duplicate notifications tracking
+ if (!completedSourceTasks.get(srcTaskId)) {
+ completedSourceTasks.set(srcTaskId);
+ // source task has completed
+ if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ numBipartiteSourceTasksCompleted++;
}
}
schedulePendingTasks();
@@ -564,7 +574,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
LOG.info("Ignoring vertex manager event from: " + producerTask);
return;
}
-
+
+ String vName = producerTask.getVertexIdentifier().getName();
+ SourceVertexInfo srcInfo = srcVertexInfo.get(vName);
+ Preconditions.checkState(srcInfo != null, "Unknown vmEvent from " + producerTask);
+
numVertexManagerEventsReceived++;
long sourceTaskOutputSize = 0;
@@ -591,12 +605,17 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
throw new TezUncheckedException(e);
}
}
+ srcInfo.numVMEventsReceived++;
+ srcInfo.outputSize += sourceTaskOutputSize;
completedSourceTasksOutputSize += sourceTaskOutputSize;
}
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Received info of output size: " + sourceTaskOutputSize
- + " numInfoReceived: " + numVertexManagerEventsReceived
+ LOG.debug("For attempt: " + vmEvent.getProducerAttemptIdentifier()
+ + " received info of output size: " + sourceTaskOutputSize
+ + " vertex numEventsReceived: " + srcInfo.numVMEventsReceived
+ + " vertex output size: " + srcInfo.outputSize
+ + " total numEventsReceived: " + numVertexManagerEventsReceived
+ " total output size: " + completedSourceTasksOutputSize);
}
}
@@ -613,7 +632,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
totalTasksToSchedule = pendingTasks.size();
if (stats == null) {
- stats = new long[totalTasksToSchedule];
+ stats = new long[totalTasksToSchedule]; // TODO lost previous data
}
}
@@ -625,22 +644,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
});
}
- void updateSourceTaskCount() {
- // track source vertices
- int numSrcTasks = 0;
- Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo();
- for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
- numSrcTasks += getContext().getVertexNumTasks(entry.getKey());
- }
- totalNumBipartiteSourceTasks = numSrcTasks;
- }
-
/**
* Compute optimal parallelism needed for the job
* @return true (if parallelism is determined), false otherwise
*/
@VisibleForTesting
- boolean determineParallelismAndApply() {
+ boolean determineParallelismAndApply(float minSourceVertexCompletedTaskFraction) {
if(numVertexManagerEventsReceived == 0) {
if (totalNumBipartiteSourceTasks > 0) {
return true;
@@ -656,21 +665,28 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
*/
boolean canDetermineParallelismLater = (completedSourceTasksOutputSize <
desiredTaskInputDataSize)
- && (numBipartiteSourceTasksCompleted < (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+ && (minSourceVertexCompletedTaskFraction < slowStartMaxSrcCompletionFraction);
if (canDetermineParallelismLater) {
LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName()
+ ", totalNumBipartiteSourceTasks=" + totalNumBipartiteSourceTasks
+ ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize
+ ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived
- + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted + ", maxThreshold="
- + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+ + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted
+ + ", minSourceVertexCompletedTaskFraction=" + minSourceVertexCompletedTaskFraction);
return false;
}
+ // Change this to use per partition stats for more accuracy TEZ-2962.
+ // Instead of aggregating overall size and then dividing equally - coalesce partitions until
+ // desired per partition size is achieved.
long expectedTotalSourceTasksOutputSize = 0;
- if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) {
- expectedTotalSourceTasksOutputSize =
- (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+ for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+ SourceVertexInfo srcInfo = vInfo.getValue();
+ if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
+ // this assumes that 1 vmEvent is received per completed task - TEZ-2961
+ expectedTotalSourceTasksOutputSize +=
+ (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived;
+ }
}
int desiredTaskParallelism =
@@ -755,7 +771,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
}
- void schedulePendingTasks(int numTasksToSchedule) {
+ void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) {
// determine parallelism before scheduling the first time
// this is the latest we can wait before determining parallelism.
// currently this depends on task completion and so this is the best time
@@ -764,7 +780,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
// calculating parallelism or change parallelism while tasks are already
// running then we can create other parameters to trigger this calculation.
if(enableAutoParallelism && !parallelismDetermined) {
- parallelismDetermined = determineParallelismAndApply();
+ parallelismDetermined = determineParallelismAndApply(minSourceVertexCompletedTaskFraction);
if (!parallelismDetermined) {
//try to determine parallelism later when more info is available.
return;
@@ -851,10 +867,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
*/
boolean canScheduleTasks() {
for(Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
- String sourceVertex = entry.getKey();
- int numSourceTasks = getContext().getVertexNumTasks(sourceVertex);
- if (numSourceTasks > 0 && !entry.getValue().vertexIsConfigured) {
- // vertex not configured
+ // need to check for vertex configured because until that we dont know if numTasks==0 is valid
+ if (!entry.getValue().vertexIsConfigured) { // isConfigured
+ // vertex not scheduled tasks
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: "
+ getContext().getVertexName());
@@ -888,29 +903,38 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
LOG.info("All source tasks assigned. " +
"Ramping up " + numPendingTasks +
" remaining tasks for vertex: " + getContext().getVertexName());
- schedulePendingTasks(numPendingTasks);
+ schedulePendingTasks(numPendingTasks, 1);
return;
}
- float completedSourceTaskFraction = 0f;
- if (totalNumBipartiteSourceTasks != 0) { // support for 0 source tasks
- completedSourceTaskFraction = (float) numBipartiteSourceTasksCompleted / totalNumBipartiteSourceTasks;
- } else {
- completedSourceTaskFraction = 1;
+ float minSourceVertexCompletedTaskFraction = 1f;
+ String minCompletedVertexName = "";
+ for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+ SourceVertexInfo srcInfo = vInfo.getValue();
+ // canScheduleTasks check has already verified all sources are configured
+ Preconditions.checkState(srcInfo.vertexIsConfigured, "Vertex: " + vInfo.getKey());
+ if (srcInfo.numTasks > 0) {
+ int numCompletedTasks = srcInfo.getNumCompletedTasks();
+ float completedFraction = (float) numCompletedTasks / srcInfo.numTasks;
+ if (minSourceVertexCompletedTaskFraction > completedFraction) {
+ minSourceVertexCompletedTaskFraction = completedFraction;
+ minCompletedVertexName = vInfo.getKey();
+ }
+ }
}
// start scheduling when source tasks completed fraction is more than min.
// linearly increase the number of scheduled tasks such that all tasks are
// scheduled when source tasks completed fraction reaches max
- float tasksFractionToSchedule = 1;
+ float tasksFractionToSchedule = 1;
float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction;
if (percentRange > 0) {
tasksFractionToSchedule =
- (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
+ (minSourceVertexCompletedTaskFraction - slowStartMinSrcCompletionFraction)/
percentRange;
} else {
// min and max are equal. schedule 100% on reaching min
- if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) {
+ if(minSourceVertexCompletedTaskFraction < slowStartMinSrcCompletionFraction) {
tasksFractionToSchedule = 0;
}
}
@@ -928,10 +952,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
getContext().getVertexName() + " with totalTasks: " +
totalTasksToSchedule + ". " + numBipartiteSourceTasksCompleted +
" source tasks completed out of " + totalNumBipartiteSourceTasks +
- ". SourceTaskCompletedFraction: " + completedSourceTaskFraction +
+ ". MinSourceTaskCompletedFraction: " + minSourceVertexCompletedTaskFraction +
+ " in Vertex: " + minCompletedVertexName +
" min: " + slowStartMinSrcCompletionFraction +
" max: " + slowStartMaxSrcCompletionFraction);
- schedulePendingTasks(numTasksToSchedule);
+ schedulePendingTasks(numTasksToSchedule, minSourceVertexCompletedTaskFraction);
}
}
@@ -1002,8 +1027,13 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
SourceVertexInfo vInfo = srcVertexInfo.get(stateUpdate.getVertexName());
Preconditions.checkState(vInfo.vertexIsConfigured == false);
vInfo.vertexIsConfigured = true;
+ vInfo.numTasks = getContext().getVertexNumTasks(stateUpdate.getVertexName());
+ if (vInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ totalNumBipartiteSourceTasks += vInfo.numTasks;
+ }
LOG.info("Received configured notification : " + stateUpdate.getVertexState() + " for vertex: "
- + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+ + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName() +
+ " numBipartiteSourceTasks: " + totalNumBipartiteSourceTasks);
schedulePendingTasks();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 862e4df..9d53ebc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -242,10 +242,12 @@ public class TestShuffleVertexManager {
// check waiting for notification before scheduling
Assert.assertFalse(manager.pendingTasks.isEmpty());
- // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
+ // source vertices have 0 tasks. triggers scheduling
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.isEmpty());
- verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(1)).reconfigureVertex(eq(1), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
@@ -258,13 +260,15 @@ public class TestShuffleVertexManager {
verify(mockContext, times(3)).vertexReconfigurationPlanned();
// source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
// normally this event will not come before onVertexStarted() is called
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start
Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
// trigger start and processing of pending notification events
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 2);
- verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(2)).reconfigureVertex(eq(1), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -275,30 +279,31 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
- VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, mockSrcVertexId1);
// parallelism not change due to large data size
manager = createManager(conf, mockContext, 0.1f, 0.1f);
verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
manager.onVertexManagerEventReceived(vmEvent);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex();
// trigger scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(4, scheduledTasks.size());
// TODO TEZ-1714 locking verify(mockContext, times(2)).vertexManagerDone(); // notified after scheduling all tasks
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
/**
@@ -313,11 +318,10 @@ public class TestShuffleVertexManager {
manager = createManager(conf, mockContext, 0.01f, 0.75f);
manager.onVertexStarted(emptyCompletions);
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
- vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId1));
+ vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1));
manager.onVertexManagerEventReceived(vmEvent);
Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
@@ -329,7 +333,7 @@ public class TestShuffleVertexManager {
// sending again from a different version of the same task has not impact
TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
- vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId2));
+ vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2));
manager.onVertexManagerEventReceived(vmEvent);
Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
@@ -348,41 +352,42 @@ public class TestShuffleVertexManager {
//min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
manager = createManager(conf, mockContext, 0.01f, 0.75f);
manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//First task in src1 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1);
manager.onVertexManagerEventReceived(vmEvent); //small payload
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertTrue(manager.determineParallelismAndApply(0f) == false);
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
- //Second task in src1 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+ //First task in src2 completed with small payload
+ vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent); //small payload
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
//Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
- Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false);
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
//First task in src2 completed (with larger payload) to trigger determining parallelism
- vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
+ Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure parallelism is determined
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertEquals(1, manager.pendingTasks.size());
Assert.assertEquals(1, scheduledTasks.size());
@@ -395,10 +400,11 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
scheduledTasks.clear();
- vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
//min/max fraction of 0.0/0.2
manager = createManager(conf, mockContext, 0.0f, 0.2f);
+ // initial invocation count == 3
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
@@ -407,22 +413,25 @@ public class TestShuffleVertexManager {
Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//send 7 events with payload size as 100
- for(int i=0;i<7;i++) {
- manager.onVertexManagerEventReceived(vmEvent); //small payload
+ for(int i=0;i<8;i++) {
+ //small payload - create new event each time or it will be ignored (from same task)
+ manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L, mockSrcVertexId1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
//should not change parallelism
- verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ }
+ for(int i=0;i<3;i++) {
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i));
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
}
- //send 8th event with payload size as 100
- manager.onVertexManagerEventReceived(vmEvent);
-
//ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
//Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
- verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class),
- anyMap());
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
+ // parallelism updated
+ verify(mockContext, times(4)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ // check exact update value - 8 events with 100 each => 20 -> 2000 => 2 tasks (with 1000 per task)
+ verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
//reset context for next test
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
@@ -444,7 +453,7 @@ public class TestShuffleVertexManager {
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1);
manager.onVertexManagerEventReceived(vmEvent);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertEquals(4, manager.pendingTasks.size());
@@ -458,14 +467,15 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
- vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent);
//ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
// managedVertex tasks reduced
- verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
// TODO improve tests for parallelism
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -478,7 +488,7 @@ public class TestShuffleVertexManager {
// more completions dont cause recalculation of parallelism
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -548,7 +558,8 @@ public class TestShuffleVertexManager {
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
-
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
+
// fail if there is no bipartite src vertex
mockInputVertices.put(mockSrcVertexId3, eProp3);
try {
@@ -583,6 +594,9 @@ public class TestShuffleVertexManager {
// source vertices have 0 tasks. immediate start of all managed tasks
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -618,8 +632,9 @@ public class TestShuffleVertexManager {
}
// source vertex have some tasks. min > default and max undefined
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+ int numTasks = 20;
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks);
scheduledTasks.clear();
manager = createManager(conf, mockContext, 0.8f, null);
@@ -627,18 +642,17 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+
Assert.assertEquals(3, manager.pendingTasks.size());
- Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks;
- int completedTasks = 0;
+ float completedTasksThreshold = 0.8f * numTasks;
// Finish all tasks before exceeding the threshold
for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
- ++completedTasks;
- if ((completedTasks + 1) >= completedTasksThreshold) {
+ // complete 0th tasks outside the loop
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1));
+ if ((i + 2) >= completedTasksThreshold) {
// stop before completing more than min/max source tasks
break;
}
@@ -649,7 +663,10 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
// Cross the threshold min/max threshold to schedule all tasks
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ Assert.assertEquals(3, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size());
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertEquals(0, manager.pendingTasks.size());
Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
@@ -660,13 +677,13 @@ public class TestShuffleVertexManager {
// source vertex have some tasks. min, max == 0
manager = createManager(conf, mockContext, 0.0f, 0.0f);
manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.totalTasksToSchedule == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
// all source vertices need to be configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -683,10 +700,15 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
+ // task completion on only 1 SG edge does nothing
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
// min, max > 0 and min == max == absolute max 1.0
manager = createManager(conf, mockContext, 1.0f, 1.0f);
@@ -742,6 +764,10 @@ public class TestShuffleVertexManager {
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ // reset vertices for next test
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4);
+
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 0.75f);
manager.onVertexStarted(emptyCompletions);
@@ -749,26 +775,30 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
// completion of same task again should not get counted
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 2);
+ Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
scheduledTasks.clear();
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 1.0f);
@@ -777,20 +807,24 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
Assert.assertTrue(manager.pendingTasks.size() == 1);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
- manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
}
@@ -844,7 +878,7 @@ public class TestShuffleVertexManager {
when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
- VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
// check initialization
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
@@ -867,11 +901,11 @@ public class TestShuffleVertexManager {
manager.onVertexManagerEventReceived(vmEvent);
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(9, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
//Send events for all tasks of m3.
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
@@ -879,22 +913,29 @@ public class TestShuffleVertexManager {
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
- //Send an event for m2. But still we need to wait for at least 1 event from r1.
+ //Send events for m2. But still we need to wait for at least 1 event from r1.
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
+
+ // we need to wait for at least 1 event from r1 to make sure all vertices cross min threshold
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
//Ensure that setVertexParallelism is not called for R2.
verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
//ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
- when(mockContext_R2.getVertexNumTasks("R2")).thenReturn(1);
+ when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1);
// complete configuration of r1 triggers the scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class),
anyMap());
@@ -913,14 +954,13 @@ public class TestShuffleVertexManager {
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
manager.onVertexStarted(emptyCompletions);
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
// Only need completed configuration notification from m3
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+ Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1032,12 +1072,12 @@ public class TestShuffleVertexManager {
//Tasks should be scheduled in task 2, 0, 1 order
long[] sizes = new long[]{(100 * 1000l * 1000l), (0l), (5000 * 1000l * 1000l)};
- VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, "R2");
+ VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
manager.onVertexManagerEventReceived(vmEvent); //send VM event
//stats from another vertex (more of empty stats)
sizes = new long[]{(0l), (0l), (0l)};
- vmEvent = getVertexManagerEvent(sizes, 1060000000, "R2");
+ vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
manager.onVertexManagerEventReceived(vmEvent); //send VM event
//Send an event for m2.
@@ -1139,7 +1179,7 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
- //Send an event for m2.
+ //Send an event for m3.
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1197,8 +1237,9 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 0);
- // event from m3 triggers scheduling. no need for m2 since it has 0 tasks
+ // event from m3 triggers scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1215,6 +1256,8 @@ public class TestShuffleVertexManager {
//Send 1 events for tasks of r1.
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);