You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2014/09/23 02:30:56 UTC
git commit: TEZ-978. Enhance auto parallelism tuning for queries
having empty outputs or data skewness (Rajesh Balamohan)
Repository: tez
Updated Branches:
refs/heads/master 4023898c1 -> 9159e1170
TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness (Rajesh Balamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9159e117
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9159e117
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9159e117
Branch: refs/heads/master
Commit: 9159e11700d3eff722fdda615565528182a619d2
Parents: 4023898
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Sep 23 05:59:50 2014 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Sep 23 05:59:50 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../vertexmanager/ShuffleVertexManager.java | 60 ++++++++---
.../vertexmanager/TestShuffleVertexManager.java | 105 ++++++++++++++++++-
3 files changed, 146 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 407acc9..f3b2ed0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@ ALL CHANGES
TEZ-1550. TestEnvironmentUpdateUtils.testMultipleUpdateEnvironment fails on
Windows
TEZ-1554. Failing tests in TestMRHelpers related to environment on Windows
+ TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness
Release 0.5.0: 2014-09-03
http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 85d46d5..2aaae16 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.library.vertexmanager;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -120,7 +121,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
int minTaskParallelism = 1;
boolean enableAutoParallelism = false;
boolean parallelismDetermined = false;
-
+
int totalNumSourceTasks = 0;
int numSourceTasksCompleted = 0;
int numVertexManagerEventsReceived = 0;
@@ -382,18 +383,43 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
totalNumSourceTasks = numSrcTasks;
}
- void determineParallelismAndApply() {
+ /**
+ * Compute optimal parallelism needed for the job
+ * @return true (if parallelism is determined), false otherwise
+ */
+ @VisibleForTesting
+ boolean determineParallelismAndApply() {
if(numSourceTasksCompleted == 0) {
- return;
+ return true;
}
if(numVertexManagerEventsReceived == 0) {
- return;
+ return true;
}
int currentParallelism = pendingTasks.size();
- long expectedTotalSourceTasksOutputSize =
- (totalNumSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
+ /**
+ * When overall completed output size is not even equal to
+ * desiredTaskInputSize, we can wait for some more data to be available to determine
+ * better parallelism until max.fraction is reached. min.fraction is just a hint to the
+ * framework and need not be honored strictly in this case.
+ */
+ boolean canDetermineParallelismLater = (completedSourceTasksOutputSize <
+ desiredTaskInputDataSize)
+ && (numSourceTasksCompleted < (totalNumSourceTasks * slowStartMaxSrcCompletionFraction));
+ if (canDetermineParallelismLater) {
+ LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName()
+ + ", totalNumSourceTasks=" + totalNumSourceTasks
+ + ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize
+ + ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived
+ + ", numSourceTasksCompleted=" + numSourceTasksCompleted + ", maxThreshold="
+ + (totalNumSourceTasks * slowStartMaxSrcCompletionFraction));
+ return false;
+ }
+
+ long expectedTotalSourceTasksOutputSize =
+ (totalNumSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+
int desiredTaskParallelism =
(int)(
(expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
@@ -403,7 +429,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
if(desiredTaskParallelism >= currentParallelism) {
- return;
+ return true;
}
// most shufflers will be assigned this range
@@ -411,7 +437,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
if (basePartitionRange <= 1) {
// nothing to do if range is equal 1 partition. shuffler does it by default
- return;
+ return true;
}
int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
@@ -425,7 +451,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
+ " . Expected output: " + expectedTotalSourceTasksOutputSize
+ " based on actual output: " + completedSourceTasksOutputSize
+ " from " + numVertexManagerEventsReceived + " vertex manager events. "
- + " desiredTaskInputSize: " + desiredTaskInputDataSize);
+ + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" +
+ (totalNumSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" +
+ numSourceTasksCompleted);
if(finalTaskParallelism < currentParallelism) {
// final parallelism is less than actual parallelism
@@ -447,8 +475,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
- updatePendingTasks();
+ updatePendingTasks();
}
+ return true;
}
void schedulePendingTasks(int numTasksToSchedule) {
@@ -460,9 +489,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
// calculating parallelism or change parallelism while tasks are already
// running then we can create other parameters to trigger this calculation.
if(enableAutoParallelism && !parallelismDetermined) {
- // do this once
- parallelismDetermined = true;
- determineParallelismAndApply();
+ parallelismDetermined = determineParallelismAndApply();
+ if (!parallelismDetermined) {
+ //try to determine parallelism later when more info is available.
+ return;
+ }
}
List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
@@ -498,8 +529,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
// linearly increase the number of scheduled tasks such that all tasks are
// scheduled when source tasks completed fraction reaches max
float tasksFractionToSchedule = 1;
- float percentRange = slowStartMaxSrcCompletionFraction -
- slowStartMinSrcCompletionFraction;
+ float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction;
if (percentRange > 0) {
tasksFractionToSchedule =
(completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 4768c6c..9ac8210 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -232,14 +232,105 @@ public class TestShuffleVertexManager {
Assert.assertEquals(4, scheduledTasks.size());
Assert.assertEquals(1, manager.numSourceTasksCompleted);
Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
-
-
+
+ /**
+ * Test for TEZ-978
+ * Delay determining parallelism until enough data has been received.
+ */
+ scheduledTasks.clear();
+ payload =
+ VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer();
+ vmEvent = VertexManagerEvent.create("Vertex", payload);
+
+ //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
+ manager = createManager(conf, mockContext, 0.01f, 0.75f);
+ manager.onVertexStarted(null);
+ Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(4, manager.totalNumSourceTasks);
+ Assert.assertEquals(0, manager.numSourceTasksCompleted);
+
+ //First task in src1 completed with small payload
+ manager.onVertexManagerEventReceived(vmEvent); //small payload
+ manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertEquals(4, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+ Assert.assertEquals(1, manager.numSourceTasksCompleted);
+ Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+ Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
+
+ //Second task in src1 completed with small payload
+ manager.onVertexManagerEventReceived(vmEvent); //small payload
+ manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
+ Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertEquals(4, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+ Assert.assertEquals(1, manager.numSourceTasksCompleted);
+ Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
+ Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
+
+ //First task in src2 completed (with larger payload) to trigger determining parallelism
+ payload =
+ VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString()
+ .asReadOnlyByteBuffer();
+ vmEvent = VertexManagerEvent.create("Vertex", payload);
+ manager.onVertexManagerEventReceived(vmEvent);
+ Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
+ verify(mockContext, times(1)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
+ anyMap(),
+ anyMap());
+ manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+ Assert.assertEquals(1, manager.pendingTasks.size());
+ Assert.assertEquals(1, scheduledTasks.size());
+ Assert.assertEquals(2, manager.numSourceTasksCompleted);
+ Assert.assertEquals(3, manager.numVertexManagerEventsReceived);
+ Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize);
+
+ //Test for max fraction. Min fraction is just instruction to framework, but honor max fraction
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
+ scheduledTasks.clear();
+ payload =
+ VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString()
+ .asReadOnlyByteBuffer();
+ vmEvent = VertexManagerEvent.create("Vertex", payload);
+
+ //min/max fraction of 0.0/0.2
+ manager = createManager(conf, mockContext, 0.0f, 0.2f);
+ manager.onVertexStarted(null);
+ Assert.assertEquals(40, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(40, manager.totalNumSourceTasks);
+ Assert.assertEquals(0, manager.numSourceTasksCompleted);
+ //send 7 events with payload size as 100
+ for(int i=0;i<7;i++) {
+ manager.onVertexManagerEventReceived(vmEvent); //small payload
+ manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i));
+ //should not change parallelism
+ verify(mockContext, times(0)).setVertexParallelism(eq(4), any(VertexLocationHint.class),
+ anyMap(),
+ anyMap());
+ }
+ //send 8th event with payload size as 100
+ manager.onVertexManagerEventReceived(vmEvent);
+ manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(8));
+ //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
+ verify(mockContext, times(1)).setVertexParallelism(eq(4), any(VertexLocationHint.class),
+ anyMap(),
+ anyMap());
+
+ //reset context for next test
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+
// parallelism changed due to small data size
scheduledTasks.clear();
payload =
VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer();
vmEvent = VertexManagerEvent.create("Vertex", payload);
-
+
manager = createManager(conf, mockContext, 0.5f, 0.5f);
manager.onVertexStarted(null);
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
@@ -266,7 +357,9 @@ public class TestShuffleVertexManager {
manager.onVertexManagerEventReceived(vmEvent);
manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
// managedVertex tasks reduced
- verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap());
+ verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
+ anyMap(),
+ anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
// TODO improve tests for parallelism
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -279,7 +372,9 @@ public class TestShuffleVertexManager {
// more completions dont cause recalculation of parallelism
manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
- verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap());
+ verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
+ anyMap(),
+ anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();