You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/05/17 05:31:23 UTC
svn commit: r1103990 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/test/mapred/org/apache/hadoop/mapred/
Author: tomwhite
Date: Tue May 17 03:31:22 2011
New Revision: 1103990
URL: http://svn.apache.org/viewvc?rev=1103990&view=rev
Log:
MAPREDUCE-2327. MapTask doesn't need to put username information in SpillRecord. Contributed by Todd Lipcon.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1103990&r1=1103989&r2=1103990&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue May 17 03:31:22 2011
@@ -736,6 +736,9 @@ Release 0.22.0 - Unreleased
MAPREDUCE-2486. Incorrect snapshot dependency published in .pom files
(todd)
+ MAPREDUCE-2327. MapTask doesn't need to put username information in
+ SpillRecord. (todd via tomwhite)
+
Release 0.21.1 - Unreleased
NEW FEATURES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1103990&r1=1103989&r2=1103990&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue May 17 03:31:22 2011
@@ -768,7 +768,8 @@ class MapTask extends Task {
final ArrayList<SpillRecord> indexCacheList =
new ArrayList<SpillRecord>();
private int totalIndexCacheMemory;
- private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
+ private int indexCacheMemoryLimit;
+ private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
@@ -783,6 +784,8 @@ class MapTask extends Task {
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
+ indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
+ INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
@@ -1466,7 +1469,7 @@ class MapTask extends Task {
}
}
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
@@ -1531,7 +1534,7 @@ class MapTask extends Task {
throw e;
}
}
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
@@ -1647,8 +1650,7 @@ class MapTask extends Task {
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
- indexCacheList.add(new SpillRecord(indexFileName, job,
- UserGroupInformation.getCurrentUser().getShortUserName()));
+ indexCacheList.add(new SpillRecord(indexFileName, job));
}
//make correction in the length to include the sequence file header
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1103990&r1=1103989&r2=1103990&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java Tue May 17 03:31:22 2011
@@ -50,6 +50,10 @@ class SpillRecord {
entries = buf.asLongBuffer();
}
+ public SpillRecord(Path indexFileName, JobConf job) throws IOException {
+ this(indexFileName, job, null);
+ }
+
public SpillRecord(Path indexFileName, JobConf job, String expectedIndexOwner)
throws IOException {
this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1103990&r1=1103989&r2=1103990&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue May 17 03:31:22 2011
@@ -128,6 +128,8 @@ public interface MRJobConfig {
public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
+ public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
+
public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";
public static final String PRESERVE_FILES_PATTERN = "mapreduce.task.files.preserve.filepattern";
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=1103990&r1=1103989&r2=1103990&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Tue May 17 03:31:22 2011
@@ -120,4 +120,27 @@ public class TestMiniMRWithDFSWithDistin
runJobAsUser(job2, BOB_UGI);
}
+ /**
+ * Regression test for MAPREDUCE-2327. Verifies that, even if a map
+ * task makes lots of spills (more than fit in the spill index cache)
+ * that it will succeed.
+ */
+ public void testMultipleSpills() throws Exception {
+ JobConf job1 = mr.createJobConf();
+
+ // Make sure it spills twice
+ job1.setFloat(MRJobConfig.MAP_SORT_SPILL_PERCENT, 0.0001f);
+ job1.setInt(MRJobConfig.IO_SORT_MB, 1);
+
+ // Make sure the spill records don't fit in index cache
+ job1.setInt(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, 0);
+
+ String input = "The quick brown fox\nhas many silly\n"
+ + "red fox sox\n";
+ Path inDir = new Path("/testing/distinct/input");
+ Path outDir = new Path("/user/alice/output");
+ TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1,
+ input, 2, 1, inDir, outDir);
+ runJobAsUser(job1, ALICE_UGI);
+ }
}