You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:51:21 UTC

svn commit: r1077201 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ReduceTask.java test/org/apache/hadoop/mapred/TestReduceFetch.java

Author: omalley
Date: Fri Mar  4 03:51:21 2011
New Revision: 1077201

URL: http://svn.apache.org/viewvc?rev=1077201&view=rev
Log:
commit 032e238d85346be92d7766ae58151f97c822015f
Author: Chris Douglas <cd...@apache.org>
Date:   Mon Feb 22 22:26:08 2010 -0800

    Revert MAPREDUCE:433

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077201&r1=1077200&r2=1077201&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 03:51:21 2011
@@ -1011,10 +1011,9 @@ class ReduceTask extends Task {
           throw new IOException("mapred.job.shuffle.input.buffer.percent" +
                                 maxInMemCopyUse);
         }
-        // Allow unit tests to fix Runtime memory
-        maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
-            (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-          * maxInMemCopyUse);
+        maxSize = (long)Math.min(
+            Runtime.getRuntime().maxMemory() * maxInMemCopyUse,
+            Integer.MAX_VALUE);
         maxSingleShuffleLimit = (long)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
         LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + 
                  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=1077201&r1=1077200&r2=1077201&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Fri Mar  4 03:51:21 2011
@@ -101,53 +101,48 @@ public class TestReduceFetch extends Tes
   }
 
   public void testReduceFromDisk() throws Exception {
-    final int MAP_TASKS = 8;
     JobConf job = mrCluster.createJobConf();
     job.set("mapred.job.reduce.input.buffer.percent", "0.0");
-    job.setNumMapTasks(MAP_TASKS);
-    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
-    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
-    job.setInt("io.sort.factor", 2);
-    job.setInt("mapred.inmem.merge.threshold", 4);
+    job.setNumMapTasks(3);
     Counters c = runJob(job);
-    final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
-    final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
-    assertTrue("Expected all records spilled during reduce (" + spill + ")",
-        spill >= 2 * out); // all records spill at map, reduce
-    assertTrue("Expected intermediate merges (" + spill + ")",
-        spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
+    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
+    assertTrue("Expected more bytes read from local (" +
+        localRead + ") than written to HDFS (" + hdfsWritten + ")",
+        hdfsWritten <= localRead);
   }
 
   public void testReduceFromPartialMem() throws Exception {
-    final int MAP_TASKS = 5;
     JobConf job = mrCluster.createJobConf();
-    job.setNumMapTasks(MAP_TASKS);
+    job.setNumMapTasks(5);
     job.setInt("mapred.inmem.merge.threshold", 0);
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
     job.setInt("mapred.reduce.parallel.copies", 1);
     job.setInt("io.sort.mb", 10);
-    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
+    job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
     job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
     job.setNumTasksToExecutePerJvm(1);
     job.set("mapred.job.shuffle.merge.percent", "1.0");
     Counters c = runJob(job);
-    final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
-    final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
-    assertTrue("Expected some records not spilled during reduce" + spill + ")",
-        spill < 2 * out); // spilled map records, some records at the reduce
+    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
+    assertTrue("Expected at least 1MB fewer bytes read from local (" +
+        localRead + ") than written to HDFS (" + hdfsWritten + ")",
+        hdfsWritten >= localRead + 1024 * 1024);
   }
 
   public void testReduceFromMem() throws Exception {
-    final int MAP_TASKS = 3;
     JobConf job = mrCluster.createJobConf();
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
-    job.set("mapred.job.shuffle.input.buffer.percent", "1.0");
-    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
-    job.setNumMapTasks(MAP_TASKS);
+    job.setNumMapTasks(3);
     Counters c = runJob(job);
-    final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
-    final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
-    assertEquals("Spilled records: " + spill, out, spill); // no reduce spill
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
+    assertTrue("Non-zero read from local: " + localRead, localRead == 0);
   }
 
 }