You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/02/23 05:45:49 UTC

tez git commit: TEZ-3126. Log reason for not reducing parallelism (jeagles)

Repository: tez
Updated Branches:
  refs/heads/master 44ca2295f -> fd75e6403


TEZ-3126. Log reason for not reducing parallelism (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fd75e640
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fd75e640
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fd75e640

Branch: refs/heads/master
Commit: fd75e640396da8d5e1c67ef554d5db1846e08c69
Parents: 44ca229
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Mon Feb 22 22:45:23 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Mon Feb 22 22:45:23 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../vertexmanager/ShuffleVertexManager.java     | 28 ++++++++++++--------
 2 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fd75e640/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2311c55..516012f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3126. Log reason for not reducing parallelism
   TEZ-3131. Support a way to override test_root_dir for FaultToleranceTestRunner.
   TEZ-3067. Links to tez configs documentation should be bubbled up to top-level release page.
   TEZ-3123. Containers can get re-used even with conflicting local resources.
@@ -338,6 +339,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3126. Log reason for not reducing parallelism
   TEZ-3123. Containers can get re-used even with conflicting local resources.
   TEZ-3117. Deadlock in Edge and Vertex code
   TEZ-3103. Shuffle can hang when memory to memory merging enabled

http://git-wip-us.apache.org/repos/asf/tez/blob/fd75e640/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index ea00532..47fc60f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -706,6 +706,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       }
     }
 
+    LOG.info("Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual output: "
+        + completedSourceTasksOutputSize + " from " + numVertexManagerEventsReceived + " vertex manager events. "
+        + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:"
+        + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:"
+        + numBipartiteSourceTasksCompleted);
+
     int desiredTaskParallelism = 
         (int)(
             (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
@@ -713,16 +719,22 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     if(desiredTaskParallelism < minTaskParallelism) {
       desiredTaskParallelism = minTaskParallelism;
     }
-    
+
     if(desiredTaskParallelism >= currentParallelism) {
+      LOG.info("Not reducing auto parallelism for vertex: " + getContext().getVertexName()
+          + " since the desired parallelism of " + desiredTaskParallelism
+          + " is greater than or equal to the current parallelism of " + pendingTasks.size());
       return true;
     }
-    
+
     // most shufflers will be assigned this range
     basePartitionRange = currentParallelism/desiredTaskParallelism;
     
     if (basePartitionRange <= 1) {
       // nothing to do if range is equal 1 partition. shuffler does it by default
+      LOG.info("Not reducing auto parallelism for vertex: " + getContext().getVertexName()
+          + " by less than half since combining two inputs will potentially break the desired task input size of "
+          + desiredTaskInputDataSize);
       return true;
     }
     
@@ -732,15 +744,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
           (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
 
-    LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName()
-        + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
-        + " . Expected output: " + expectedTotalSourceTasksOutputSize 
-        + " based on actual output: " + completedSourceTasksOutputSize
-        + " from " + numVertexManagerEventsReceived + " vertex manager events. "
-        + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" +
-        (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" +
-        numBipartiteSourceTasksCompleted);
-          
+    LOG.info("Reducing auto parallelism for vertex: " + getContext().getVertexName()
+        + " from " + pendingTasks.size() + " to " + finalTaskParallelism);
+
     if(finalTaskParallelism < currentParallelism) {
       // final parallelism is less than actual parallelism
       Map<String, EdgeProperty> edgeProperties =