You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/24 19:51:38 UTC
[4/9] tez git commit: TEZ-3126. Log reason for not reducing
parallelism (jeagles)
TEZ-3126. Log reason for not reducing parallelism (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fd75e640
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fd75e640
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fd75e640
Branch: refs/heads/TEZ-2980
Commit: fd75e640396da8d5e1c67ef554d5db1846e08c69
Parents: 44ca229
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Mon Feb 22 22:45:23 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Mon Feb 22 22:45:23 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../vertexmanager/ShuffleVertexManager.java | 28 ++++++++++++--------
2 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fd75e640/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2311c55..516012f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-3126. Log reason for not reducing parallelism
TEZ-3131. Support a way to override test_root_dir for FaultToleranceTestRunner.
TEZ-3067. Links to tez configs documentation should be bubbled up to top-level release page.
TEZ-3123. Containers can get re-used even with conflicting local resources.
@@ -338,6 +339,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-3126. Log reason for not reducing parallelism
TEZ-3123. Containers can get re-used even with conflicting local resources.
TEZ-3117. Deadlock in Edge and Vertex code
TEZ-3103. Shuffle can hang when memory to memory merging enabled
http://git-wip-us.apache.org/repos/asf/tez/blob/fd75e640/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index ea00532..47fc60f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -706,6 +706,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
}
+ LOG.info("Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual output: "
+ + completedSourceTasksOutputSize + " from " + numVertexManagerEventsReceived + " vertex manager events. "
+ + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:"
+ + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:"
+ + numBipartiteSourceTasksCompleted);
+
int desiredTaskParallelism =
(int)(
(expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
@@ -713,16 +719,22 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
if(desiredTaskParallelism < minTaskParallelism) {
desiredTaskParallelism = minTaskParallelism;
}
-
+
if(desiredTaskParallelism >= currentParallelism) {
+ LOG.info("Not reducing auto parallelism for vertex: " + getContext().getVertexName()
+ + " since the desired parallelism of " + desiredTaskParallelism
+ + " is greater than or equal to the current parallelism of " + pendingTasks.size());
return true;
}
-
+
// most shufflers will be assigned this range
basePartitionRange = currentParallelism/desiredTaskParallelism;
if (basePartitionRange <= 1) {
// nothing to do if range is equal 1 partition. shuffler does it by default
+ LOG.info("Not reducing auto parallelism for vertex: " + getContext().getVertexName()
+ + " by less than half since combining two inputs will potentially break the desired task input size of "
+ + desiredTaskInputDataSize);
return true;
}
@@ -732,15 +744,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
(numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
- LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName()
- + " to " + finalTaskParallelism + " from " + pendingTasks.size()
- + " . Expected output: " + expectedTotalSourceTasksOutputSize
- + " based on actual output: " + completedSourceTasksOutputSize
- + " from " + numVertexManagerEventsReceived + " vertex manager events. "
- + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" +
- (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" +
- numBipartiteSourceTasksCompleted);
-
+ LOG.info("Reducing auto parallelism for vertex: " + getContext().getVertexName()
+ + " from " + pendingTasks.size() + " to " + finalTaskParallelism);
+
if(finalTaskParallelism < currentParallelism) {
// final parallelism is less than actual parallelism
Map<String, EdgeProperty> edgeProperties =