You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/05/17 05:39:32 UTC

svn commit: r1103991 - in /hadoop/mapreduce/branches/branch-0.22: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapred/

Author: tomwhite
Date: Tue May 17 03:39:31 2011
New Revision: 1103991

URL: http://svn.apache.org/viewvc?rev=1103991&view=rev
Log:
Merge -r 1103989:1103990 from trunk to branch-0.22. Fixes: MAPREDUCE-2327

Modified:
    hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/SpillRecord.java
    hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/mapreduce/branches/branch-0.22/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java

Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/CHANGES.txt?rev=1103991&r1=1103990&r2=1103991&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.22/CHANGES.txt Tue May 17 03:39:31 2011
@@ -560,6 +560,9 @@ Release 0.22.0 - Unreleased
     MAPREDUCE-2486. Incorrect snapshot dependency published in .pom files
     (todd)
 
+    MAPREDUCE-2327. MapTask doesn't need to put username information in
+    SpillRecord. (todd via tomwhite)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1103991&r1=1103990&r2=1103991&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/MapTask.java Tue May 17 03:39:31 2011
@@ -768,7 +768,8 @@ class MapTask extends Task {
     final ArrayList<SpillRecord> indexCacheList =
       new ArrayList<SpillRecord>();
     private int totalIndexCacheMemory;
-    private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
+    private int indexCacheMemoryLimit;
+    private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
 
     @SuppressWarnings("unchecked")
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
@@ -783,6 +784,8 @@ class MapTask extends Task {
       final float spillper =
         job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
       final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
+      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
+                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
       if (spillper > (float)1.0 || spillper <= (float)0.0) {
         throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
             "\": " + spillper);
@@ -1466,7 +1469,7 @@ class MapTask extends Task {
           }
         }
 
-        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
           // create spill index file
           Path indexFilename =
               mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
@@ -1531,7 +1534,7 @@ class MapTask extends Task {
             throw e;
           }
         }
-        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
           // create spill index file
           Path indexFilename =
               mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
@@ -1647,8 +1650,7 @@ class MapTask extends Task {
       // read in paged indices
       for (int i = indexCacheList.size(); i < numSpills; ++i) {
         Path indexFileName = mapOutputFile.getSpillIndexFile(i);
-        indexCacheList.add(new SpillRecord(indexFileName, job,
-            UserGroupInformation.getCurrentUser().getShortUserName()));
+        indexCacheList.add(new SpillRecord(indexFileName, job));
       }
 
       //make correction in the length to include the sequence file header

Modified: hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1103991&r1=1103990&r2=1103991&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapred/SpillRecord.java Tue May 17 03:39:31 2011
@@ -50,6 +50,10 @@ class SpillRecord {
     entries = buf.asLongBuffer();
   }
 
+  public SpillRecord(Path indexFileName, JobConf job) throws IOException {
+    this(indexFileName, job, null);
+  }
+
   public SpillRecord(Path indexFileName, JobConf job, String expectedIndexOwner)
     throws IOException {
     this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);

Modified: hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1103991&r1=1103990&r2=1103991&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue May 17 03:39:31 2011
@@ -126,6 +126,8 @@ public interface MRJobConfig {
 
   public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
 
+  public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
+
   public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";
 
   public static final String PRESERVE_FILES_PATTERN = "mapreduce.task.files.preserve.filepattern";

Modified: hadoop/mapreduce/branches/branch-0.22/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=1103991&r1=1103990&r2=1103991&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Tue May 17 03:39:31 2011
@@ -120,4 +120,27 @@ public class TestMiniMRWithDFSWithDistin
     runJobAsUser(job2, BOB_UGI);
   }
 
+  /**
+   * Regression test for MAPREDUCE-2327. Verifies that, even if a map
+   * task makes lots of spills (more than fit in the spill index cache)
+   * that it will succeed.
+   */
+  public void testMultipleSpills() throws Exception {
+    JobConf job1 = mr.createJobConf();
+
+    // Make sure it spills twice
+    job1.setFloat(MRJobConfig.MAP_SORT_SPILL_PERCENT, 0.0001f);
+    job1.setInt(MRJobConfig.IO_SORT_MB, 1);
+
+    // Make sure the spill records don't fit in index cache
+    job1.setInt(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, 0);
+
+    String input = "The quick brown fox\nhas many silly\n" 
+      + "red fox sox\n";
+    Path inDir = new Path("/testing/distinct/input");
+    Path outDir = new Path("/user/alice/output");
+    TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, 
+                                           input, 2, 1, inDir, outDir);
+    runJobAsUser(job1, ALICE_UGI);
+  }
 }