You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/04 22:05:06 UTC
[47/50] [abbrv] hadoop git commit: MAPREDUCE-5649. Reduce cannot use
more than 2G memory for the final merge. Contributed by Gera Shegalov
MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by Gera Shegalov
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa5c6f40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa5c6f40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa5c6f40
Branch: refs/heads/YARN-2928
Commit: aa5c6f409510d26deed943c8ce0863c3056a13cf
Parents: f58d6cf
Author: Jason Lowe <jl...@apache.org>
Authored: Mon May 4 19:02:39 2015 +0000
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon May 4 12:59:01 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 ++
.../mapreduce/task/reduce/MergeManagerImpl.java | 47 +++++++++++---------
.../mapreduce/task/reduce/TestMergeManager.java | 29 ++++++++++++
3 files changed, 57 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa5c6f40/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 117aafa..8a62f92 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -383,6 +383,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6349. Fix typo in property org.apache.hadoop.mapreduce.
lib.chain.Chain.REDUCER_INPUT_VALUE_CLASS. (Ray Chiang via ozawa)
+ MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
+ (Gera Shegalov via jlowe)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa5c6f40/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index 8bf17ef..f788707 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -93,8 +93,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
private final OnDiskMerger onDiskMerger;
-
- private final long memoryLimit;
+
+ @VisibleForTesting
+ final long memoryLimit;
+
private long usedMemory;
private long commitMemory;
private final long maxSingleShuffleLimit;
@@ -167,11 +169,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
}
// Allow unit tests to fix Runtime memory
- this.memoryLimit =
- (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
- Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
- * maxInMemCopyUse);
-
+ this.memoryLimit = (long)(jobConf.getLong(
+ MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+ Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
+
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
final float singleShuffleMemoryLimitPercent =
@@ -202,7 +203,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
throw new RuntimeException("Invalid configuration: "
- + "maxSingleShuffleLimit should be less than mergeThreshold"
+ + "maxSingleShuffleLimit should be less than mergeThreshold "
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ "mergeThreshold: " + this.mergeThreshold);
}
@@ -668,24 +669,26 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
}
}
- private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
- List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
- List<CompressAwarePath> onDiskMapOutputs
- ) throws IOException {
- LOG.info("finalMerge called with " +
- inMemoryMapOutputs.size() + " in-memory map-outputs and " +
- onDiskMapOutputs.size() + " on-disk map-outputs");
-
+ @VisibleForTesting
+ final long getMaxInMemReduceLimit() {
final float maxRedPer =
- job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
+ jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
- throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
- maxRedPer);
+ throw new RuntimeException(maxRedPer + ": "
+ + MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT
+ + " must be a float between 0 and 1.0");
}
- int maxInMemReduce = (int)Math.min(
- Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-
+ return (long)(memoryLimit * maxRedPer);
+ }
+ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+ List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
+ List<CompressAwarePath> onDiskMapOutputs
+ ) throws IOException {
+ LOG.info("finalMerge called with " +
+ inMemoryMapOutputs.size() + " in-memory map-outputs and " +
+ onDiskMapOutputs.size() + " on-disk map-outputs");
+ final long maxInMemReduce = getMaxInMemReduceLimit();
// merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa5c6f40/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
index 8d6bab9..ef860af 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
@@ -260,4 +260,33 @@ public class TestMergeManager {
}
}
+
+ @Test
+ public void testLargeMemoryLimits() throws Exception {
+ final JobConf conf = new JobConf();
+ // Xmx in production
+ conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+ 8L * 1024 * 1024 * 1024);
+
+ // M1 = Xmx fraction for map outputs
+ conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
+
+ // M2 = max M1 fraction for a single maple output
+ conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);
+
+ // M3 = M1 fraction at which in memory merge is triggered
+ conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);
+
+ // M4 = M1 fraction of map outputs remaining in memory for a reduce
+ conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);
+
+ final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>(
+ null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
+ null, null, null, null, null, new MROutputFiles());
+ assertTrue("Large shuffle area unusable: " + mgr.memoryLimit,
+ mgr.memoryLimit > Integer.MAX_VALUE);
+ final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
+ assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
+ maxInMemReduce > Integer.MAX_VALUE);
+ }
}