You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/18 03:33:55 UTC

git commit: TEZ-903. Make max maps per fetcher configurable in ShuffleScheduler. Contributed by Rajesh Balamohan.

Repository: incubator-tez
Updated Branches:
  refs/heads/master ba89a0847 -> ae6c9746c


TEZ-903. Make max maps per fetcher configurable in ShuffleScheduler.
Contributed by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ae6c9746
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ae6c9746
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ae6c9746

Branch: refs/heads/master
Commit: ae6c9746ce9da1cf6ffcff4834af352c58fe83d8
Parents: ba89a08
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 17 19:32:49 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 17 19:32:49 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/common/TezJobConfig.java  |  9 +++++++++
 .../library/common/shuffle/impl/ShuffleScheduler.java  | 13 ++++++++-----
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae6c9746/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 69edb8b..61a1b64 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -196,6 +196,15 @@ public class TezJobConfig {
       "tez.runtime.shuffle.fetch.failures.limit";
   public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT = 5;
 
+
+  /**
+   *
+   */
+  public static final String TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE =
+    "tez.runtime.shuffle.fetch.max.task.output.at.once";
+  public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE
+          = 20;
+
   /**
    * 
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae6c9746/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 1b34430..ad197f1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -57,7 +57,6 @@ class ShuffleScheduler {
   };
 
   private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
-  private static final int MAX_MAPS_AT_ONCE = 20;
   private static final long INITIAL_PENALTY = 2000l; // 2 seconds
   private static final float PENALTY_GROWTH_RATE = 1.3f;
   
@@ -90,6 +89,7 @@ class ShuffleScheduler {
   private final long startTime;
   private long lastProgressTime;
 
+  private int maxTaskOutputAtOnce;
   private int maxFetchFailuresBeforeReporting;
   private boolean reportReadErrorImmediately = true; 
   private int maxFailedUniqueFetches = 5;
@@ -135,6 +135,9 @@ class ShuffleScheduler {
         conf.getBoolean(
             TezJobConfig.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+    this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE));
     
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
@@ -418,10 +421,10 @@ class ShuffleScheduler {
     // find the maps that we still need, up to the limit
     while (dedupedItr.hasNext()) {
       InputAttemptIdentifier id = dedupedItr.next().getValue();
-        result.add(id);
-        if (++includedMaps >= MAX_MAPS_AT_ONCE) {
-          break;
-        }
+      result.add(id);
+      if (++includedMaps >= maxTaskOutputAtOnce) {
+        break;
+      }
     }
 
     // put back the maps left after the limit