You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ge...@apache.org on 2016/08/02 06:39:01 UTC
hadoop git commit: MAPREDUCE-6724. Single shuffle to memory must not
exceed Integer#MAX_VALUE. (Haibo Chen via gera)
Repository: hadoop
Updated Branches:
refs/heads/trunk c4463f2ef -> 6890d5b47
MAPREDUCE-6724. Single shuffle to memory must not exceed Integer#MAX_VALUE. (Haibo Chen via gera)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6890d5b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6890d5b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6890d5b4
Branch: refs/heads/trunk
Commit: 6890d5b472320fa7592ed1b08b623c55a27089c6
Parents: c4463f2
Author: Gera Shegalov <ge...@apache.org>
Authored: Thu Jul 28 14:37:03 2016 -0700
Committer: Gera Shegalov <ge...@apache.org>
Committed: Mon Aug 1 23:35:47 2016 -0700
----------------------------------------------------------------------
.../mapreduce/task/reduce/MergeManagerImpl.java | 26 +++++++++++---------
.../mapreduce/task/reduce/TestMergeManager.java | 23 +++++++++++------
2 files changed, 30 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6890d5b4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index 1673ff8..09fe0cb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -99,7 +99,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
private long usedMemory;
private long commitMemory;
- private final long maxSingleShuffleLimit;
+
+ @VisibleForTesting
+ final long maxSingleShuffleLimit;
private final int memToMemMergeOutputsThreshold;
private final long mergeThreshold;
@@ -187,10 +189,16 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
usedMemory = 0L;
commitMemory = 0L;
- this.maxSingleShuffleLimit =
- (long)(memoryLimit * singleShuffleMemoryLimitPercent);
- this.memToMemMergeOutputsThreshold =
- jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
+ long maxSingleShuffleLimitConfiged =
+ (long)(memoryLimit * singleShuffleMemoryLimitPercent);
+ if(maxSingleShuffleLimitConfiged > Integer.MAX_VALUE) {
+ maxSingleShuffleLimitConfiged = Integer.MAX_VALUE;
+ LOG.info("The max number of bytes for a single in-memory shuffle cannot" +
+ " be larger than Integer.MAX_VALUE. Setting it to Integer.MAX_VALUE");
+ }
+ this.maxSingleShuffleLimit = maxSingleShuffleLimitConfiged;
+ this.memToMemMergeOutputsThreshold =
+ jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
this.mergeThreshold = (long)(this.memoryLimit *
jobConf.getFloat(
MRJobConfig.SHUFFLE_MERGE_PERCENT,
@@ -249,17 +257,13 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
public void waitForResource() throws InterruptedException {
inMemoryMerger.waitForMerge();
}
-
- private boolean canShuffleToMemory(long requestedSize) {
- return (requestedSize < maxSingleShuffleLimit);
- }
-
+
@Override
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
long requestedSize,
int fetcher
) throws IOException {
- if (!canShuffleToMemory(requestedSize)) {
+ if (requestedSize > maxSingleShuffleLimit) {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6890d5b4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
index 1c0d25b..325d2f9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
@@ -289,22 +289,29 @@ public class TestMergeManager {
final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
maxInMemReduce > Integer.MAX_VALUE);
+ assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE",
+ Integer.MAX_VALUE, mgr.maxSingleShuffleLimit);
+ verifyReservedMapOutputType(mgr, 10L, "MEMORY");
+ verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK");
+ }
+
+ private void verifyReservedMapOutputType(MergeManagerImpl<Text, Text> mgr,
+ long size, String expectedShuffleMode) throws IOException {
+ final TaskAttemptID mapId = TaskAttemptID.forName("attempt_0_1_m_1_1");
+ final MapOutput<Text, Text> mapOutput = mgr.reserve(mapId, size, 1);
+ assertEquals("Shuffled bytes: " + size, expectedShuffleMode,
+ mapOutput.getDescription());
+ mgr.unreserve(size);
}
@Test
public void testZeroShuffleMemoryLimitPercent() throws Exception {
final JobConf jobConf = new JobConf();
jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f);
- final MergeManager<Text, Text> mgr =
+ final MergeManagerImpl<Text, Text> mgr =
new MergeManagerImpl<>(null, jobConf, mock(LocalFileSystem.class),
null, null, null, null, null, null, null, null, null, null,
new MROutputFiles());
- final long mapOutputSize = 10;
- final int fetcher = 1;
- final MapOutput<Text, Text> mapOutput = mgr.reserve(
- TaskAttemptID.forName("attempt_0_1_m_1_1"),
- mapOutputSize, fetcher);
- assertEquals("Tiny map outputs should be shuffled to disk", "DISK",
- mapOutput.getDescription());
+ verifyReservedMapOutputType(mgr, 10L, "DISK");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org