You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/11/25 22:56:53 UTC

svn commit: r884288 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/task/reduce/ src/test/mapred/org/apache/hadoop/mapred/

Author: cdouglas
Date: Wed Nov 25 21:56:53 2009
New Revision: 884288

URL: http://svn.apache.org/viewvc?rev=884288&view=rev
Log:
MAPREDUCE-1182. Fix overflow in reduce causing allocations to exceed the
configured threshold.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=884288&r1=884287&r2=884288&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Nov 25 21:56:53 2009
@@ -913,3 +913,6 @@
     MAPREDUCE-28. Refactor TestQueueManager and fix default ACLs.
     (V.V.Chaitanya Krishna and Rahul K Singh via sharad)
 
+    MAPREDUCE-1182. Fix overflow in reduce causing allocations to exceed the
+    configured threshold. (cdouglas)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=884288&r1=884287&r2=884288&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Wed Nov 25 21:56:53 2009
@@ -87,12 +87,12 @@
   Set<Path> onDiskMapOutputs = new TreeSet<Path>();
   private final OnDiskMerger onDiskMerger;
   
-  private final int memoryLimit;
-  private int usedMemory;
-  private final int maxSingleShuffleLimit;
+  private final long memoryLimit;
+  private long usedMemory;
+  private final long maxSingleShuffleLimit;
   
   private final int memToMemMergeOutputsThreshold; 
-  private final int mergeThreshold;
+  private final long mergeThreshold;
   
   private final int ioSortFactor;
 
@@ -159,17 +159,17 @@
 
     // Allow unit tests to fix Runtime memory
     this.memoryLimit = 
-      (int)(jobConf.getInt(JobContext.REDUCE_MEMORY_TOTAL_BYTES,
-          (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
+      (long)(jobConf.getLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES,
+          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
         * maxInMemCopyUse);
  
     this.ioSortFactor = jobConf.getInt(JobContext.IO_SORT_FACTOR, 100);
 
     this.maxSingleShuffleLimit = 
-      (int)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
+      (long)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
     this.memToMemMergeOutputsThreshold = 
             jobConf.getInt(JobContext.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
-    this.mergeThreshold = (int)(this.memoryLimit * 
+    this.mergeThreshold = (long)(this.memoryLimit * 
                           jobConf.getFloat(JobContext.SHUFFLE_MERGE_EPRCENT, 
                                            0.90f));
     LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=884288&r1=884287&r2=884288&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Wed Nov 25 21:56:53 2009
@@ -36,7 +36,7 @@
     job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "0.0");
     job.setNumMapTasks(MAP_TASKS);
     job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
-    job.setInt(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
+    job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
     job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.05");
     job.setInt(JobContext.IO_SORT_FACTOR, 2);
     job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 4);
@@ -58,7 +58,7 @@
     JobConf job = mrCluster.createJobConf();
     job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "1.0");
     job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "1.0");
-    job.setInt(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
+    job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
     job.setNumMapTasks(MAP_TASKS);
     Counters c = runJob(job);
     final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java?rev=884288&r1=884287&r2=884288&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java Wed Nov 25 21:56:53 2009
@@ -85,7 +85,7 @@
     job.setInt(JobContext.SHUFFLE_PARALLEL_COPIES, 1);
     job.setInt(JobContext.IO_SORT_MB, 10);
     job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
-    job.setInt(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
+    job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
     job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
     job.set(JobContext.SHUFFLE_MERGE_EPRCENT, "1.0");
     Counters c = runJob(job);