You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/18 03:33:55 UTC
git commit: TEZ-903. Make max maps per fetcher configurable in
ShuffleScheduler. Contributed by Rajesh Balamohan.
Repository: incubator-tez
Updated Branches:
refs/heads/master ba89a0847 -> ae6c9746c
TEZ-903. Make max maps per fetcher configurable in ShuffleScheduler.
Contributed by Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ae6c9746
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ae6c9746
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ae6c9746
Branch: refs/heads/master
Commit: ae6c9746ce9da1cf6ffcff4834af352c58fe83d8
Parents: ba89a08
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 17 19:32:49 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 17 19:32:49 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/tez/common/TezJobConfig.java | 9 +++++++++
.../library/common/shuffle/impl/ShuffleScheduler.java | 13 ++++++++-----
2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae6c9746/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 69edb8b..61a1b64 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -196,6 +196,15 @@ public class TezJobConfig {
"tez.runtime.shuffle.fetch.failures.limit";
public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT = 5;
+
+ /**
+ *
+ */
+ public static final String TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE =
+ "tez.runtime.shuffle.fetch.max.task.output.at.once";
+ public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE
+ = 20;
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae6c9746/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 1b34430..ad197f1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -57,7 +57,6 @@ class ShuffleScheduler {
};
private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
- private static final int MAX_MAPS_AT_ONCE = 20;
private static final long INITIAL_PENALTY = 2000l; // 2 seconds
private static final float PENALTY_GROWTH_RATE = 1.3f;
@@ -90,6 +89,7 @@ class ShuffleScheduler {
private final long startTime;
private long lastProgressTime;
+ private int maxTaskOutputAtOnce;
private int maxFetchFailuresBeforeReporting;
private boolean reportReadErrorImmediately = true;
private int maxFailedUniqueFetches = 5;
@@ -135,6 +135,9 @@ class ShuffleScheduler {
conf.getBoolean(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+ this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE));
LOG.info("ShuffleScheduler running for sourceVertex: "
+ inputContext.getSourceVertexName() + " with configuration: "
@@ -418,10 +421,10 @@ class ShuffleScheduler {
// find the maps that we still need, up to the limit
while (dedupedItr.hasNext()) {
InputAttemptIdentifier id = dedupedItr.next().getValue();
- result.add(id);
- if (++includedMaps >= MAX_MAPS_AT_ONCE) {
- break;
- }
+ result.add(id);
+ if (++includedMaps >= maxTaskOutputAtOnce) {
+ break;
+ }
}
// put back the maps left after the limit