You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/10/19 21:56:46 UTC
tez git commit: TEZ-3452. Auto-reduce parallelism calculation can
overflow with large inputs (jeagles)
Repository: tez
Updated Branches:
refs/heads/master 8033e3d58 -> ed0361124
TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ed036112
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ed036112
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ed036112
Branch: refs/heads/master
Commit: ed03611245423c89a9881af8bdc85ab909992a5d
Parents: 8033e3d
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 19 16:56:26 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 19 16:56:26 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../vertexmanager/ShuffleVertexManager.java | 29 ++++--
.../vertexmanager/TestShuffleVertexManager.java | 97 ++++++++++++++++++--
3 files changed, 113 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ed036112/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca8898b..f2e362b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
TEZ-3430. Make split sorting optional.
TEZ-3466. Tez classpath building to mimic mapreduce classpath building.
@@ -129,6 +130,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
TEZ-3464. Fix findbugs warnings in tez-dag mainLoop
TEZ-3330. Propagate additional config parameters when running MR jobs via Tez.
@@ -626,6 +628,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
TEZ-3464. Fix findbugs warnings in tez-dag mainLoop
TEZ-3335. DAG client thinks app is still running when app status is null
http://git-wip-us.apache.org/repos/asf/tez/blob/ed036112/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 0bb2753..9937bd1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -45,6 +45,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.ShuffleEd
import javax.annotation.Nullable;
import java.io.IOException;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -446,13 +447,17 @@ public class ShuffleVertexManager extends ShuffleVertexManagerBase {
// Change this to use per partition stats for more accuracy TEZ-2962.
// Instead of aggregating overall size and then dividing equally - coalesce partitions until
// desired per partition size is achieved.
- long expectedTotalSourceTasksOutputSize = 0;
+ BigInteger expectedTotalSourceTasksOutputSize = BigInteger.ZERO;
for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
SourceVertexInfo srcInfo = vInfo.getValue();
if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
// this assumes that 1 vmEvent is received per completed task - TEZ-2961
- expectedTotalSourceTasksOutputSize +=
- (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived;
+ // Estimate total size by projecting based on the current average size per event
+ BigInteger srcOutputSize = BigInteger.valueOf(srcInfo.outputSize);
+ BigInteger srcNumTasks = BigInteger.valueOf(srcInfo.numTasks);
+ BigInteger srcNumVMEventsReceived = BigInteger.valueOf(srcInfo.numVMEventsReceived);
+ BigInteger expectedSrcOutputSize = srcOutputSize.multiply(srcNumTasks).divide(srcNumVMEventsReceived);
+ expectedTotalSourceTasksOutputSize = expectedTotalSourceTasksOutputSize.add(expectedSrcOutputSize);
}
}
@@ -464,10 +469,20 @@ public class ShuffleVertexManager extends ShuffleVertexManagerBase {
(totalNumBipartiteSourceTasks * config.getMaxFraction()),
numBipartiteSourceTasksCompleted);
- int desiredTaskParallelism =
- (int)((expectedTotalSourceTasksOutputSize +
- config.getDesiredTaskInputDataSize() - 1) /
- config.getDesiredTaskInputDataSize());
+ // Calculate number of desired tasks by dividing with rounding up
+ BigInteger desiredTaskInputDataSize = BigInteger.valueOf(config.getDesiredTaskInputDataSize());
+ BigInteger desiredTaskInputDataSizeMinusOne = BigInteger.valueOf(config.getDesiredTaskInputDataSize() - 1);
+ BigInteger bigDesiredTaskParallelism =
+ expectedTotalSourceTasksOutputSize.add(desiredTaskInputDataSizeMinusOne).divide(desiredTaskInputDataSize);
+
+ if(bigDesiredTaskParallelism.compareTo(BigInteger.valueOf(Integer.MAX_VALUE)) > 0) {
+ LOG.info("Not reducing auto parallelism for vertex: {}"
+ + " since the desired parallelism of {} is greater than or equal"
+ + " to the max parallelism of {}", getContext().getVertexName(),
+ bigDesiredTaskParallelism, Integer.MAX_VALUE);
+ return null;
+ }
+ int desiredTaskParallelism = bigDesiredTaskParallelism.intValue();
if(desiredTaskParallelism < mgrConfig.getMinTaskParallelism()) {
desiredTaskParallelism = mgrConfig.getMinTaskParallelism();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ed036112/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 128d6fa..ad4cceb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -512,6 +512,70 @@ public class TestShuffleVertexManager {
// parallelism changed due to small data size
scheduledTasks.clear();
+ // Ensure long overflow doesn't reduce mistakenly
+ // Overflow can occur previously when output size * num tasks for a single vertex would over flow max long
+ //
+ manager = createManager(conf, mockContext, 1.0f, 1.0f, (long)(Long.MAX_VALUE / 1.5));
+ manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
+ // task completion from non-bipartite stage does nothing
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
+ Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+ // First source 1 task completes
+ vmEvent = getVertexManagerEvent(null, 0L, mockSrcVertexId1);
+ manager.onVertexManagerEventReceived(vmEvent);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ Assert.assertEquals(4, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+ Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+ Assert.assertEquals(0L, manager.completedSourceTasksOutputSize);
+ // Second source 1 task completes
+ vmEvent = getVertexManagerEvent(null, 0L, mockSrcVertexId1);
+ manager.onVertexManagerEventReceived(vmEvent);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ Assert.assertEquals(4, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+ Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(0L, manager.completedSourceTasksOutputSize);
+ // First source 2 task completes
+ vmEvent = getVertexManagerEvent(null, Long.MAX_VALUE >> 1 , mockSrcVertexId2);
+ manager.onVertexManagerEventReceived(vmEvent);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertEquals(4, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+ Assert.assertEquals(3, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(Long.MAX_VALUE >> 1, manager.completedSourceTasksOutputSize);
+ // Second source 2 task completes
+ vmEvent = getVertexManagerEvent(null, Long.MAX_VALUE >> 1 , mockSrcVertexId2);
+ manager.onVertexManagerEventReceived(vmEvent);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+ // Auto-reduce is triggered
+ verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+ Assert.assertEquals(2, newEdgeManagers.size());
+ Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
+ Assert.assertEquals(2, scheduledTasks.size());
+ Assert.assertTrue(scheduledTasks.contains(new Integer(0)));
+ Assert.assertTrue(scheduledTasks.contains(new Integer(1)));
+ Assert.assertEquals(4, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(4, manager.numVertexManagerEventsReceived);
+ Assert.assertEquals(Long.MAX_VALUE >> 1 << 1, manager.completedSourceTasksOutputSize);
+
+ //reset context for next test
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+
+ // parallelism changed due to small data size
+ scheduledTasks.clear();
+
manager = createManager(conf, mockContext, 0.5f, 0.5f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
@@ -545,8 +609,8 @@ public class TestShuffleVertexManager {
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
// managedVertex tasks reduced
- verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
- verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(4)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
// TODO improve tests for parallelism
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -559,7 +623,7 @@ public class TestShuffleVertexManager {
// more completions dont cause recalculation of parallelism
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -1428,16 +1492,25 @@ public class TestShuffleVertexManager {
}
private ShuffleVertexManagerBase createManager(Configuration conf,
- VertexManagerPluginContext context, Float min, Float max) {
+ VertexManagerPluginContext context, Float min, Float max, Long size) {
+ if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) {
+ return createShuffleVertexManager(conf, context, min, max, size);
+ } else {
+ return null;
+ }
+ }
+
+ private ShuffleVertexManagerBase createManager(Configuration conf,
+ VertexManagerPluginContext context, Float min, Float max) {
if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) {
- return createShuffleVertexManager(conf, context, min, max);
+ return createShuffleVertexManager(conf, context, min, max, null);
} else {
return null;
}
}
private ShuffleVertexManager createShuffleVertexManager(Configuration conf,
- VertexManagerPluginContext context, Float min, Float max) {
+ VertexManagerPluginContext context, Float min, Float max, Long size) {
if (min != null) {
conf.setFloat(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
@@ -1456,9 +1529,15 @@ public class TestShuffleVertexManager {
conf.setBoolean(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
- conf.setLong(
- ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
- 1000L);
+ if (size != null) {
+ conf.setLong(
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ size);
+ } else {
+ conf.setLong(
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ 1000L);
+ }
UserPayload payload;
try {
payload = TezUtils.createUserPayloadFromConf(conf);