You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/03/23 01:08:48 UTC
svn commit: r926411 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Author: cdouglas
Date: Tue Mar 23 00:08:48 2010
New Revision: 926411
URL: http://svn.apache.org/viewvc?rev=926411&view=rev
Log:
MAPREDUCE-1480. Correctly initialize child RecordReaders in
CombineFileInputFormat. Contributed by Aaron Kimball
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=926411&r1=926410&r2=926411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar 23 00:08:48 2010
@@ -483,6 +483,9 @@ Trunk (unreleased changes)
MAPREDUCE-1420. Fix TestTTResourceReporting failure. (Scott Chen via
cdouglas)
+ MAPREDUCE-1480. Correctly initialize child RecordReaders in
+ CombineFileInputFormat. (Aaron Kimball via cdouglas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java?rev=926411&r1=926410&r2=926411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java Tue Mar 23 00:08:48 2010
@@ -56,6 +56,9 @@ public class CombineFileRecordReader<K,
TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (CombineFileSplit)split;
this.context = context;
+ if (null != this.curReader) {
+ this.curReader.initialize(split, context);
+ }
}
public boolean nextKeyValue() throws IOException, InterruptedException {
@@ -86,8 +89,13 @@ public class CombineFileRecordReader<K,
/**
* return progress based on the amount of data processed so far.
*/
- public float getProgress() throws IOException {
- return Math.min(1.0f, progress/(float)(split.getLength()));
+ public float getProgress() throws IOException, InterruptedException {
+ long subprogress = 0; // bytes processed in current split
+ if (null != curReader) {
+ // idx is always one past the current subsplit's true index.
+ subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
+ }
+ return Math.min(1.0f, (progress + subprogress)/(float)(split.getLength()));
}
/**
@@ -135,14 +143,20 @@ public class CombineFileRecordReader<K,
// get a record reader for the idx-th chunk
try {
- curReader = rrConstructor.newInstance(new Object []
- {split, context, Integer.valueOf(idx)});
-
Configuration conf = context.getConfiguration();
// setup some helper config variables.
conf.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString());
conf.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx));
conf.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx));
+
+ curReader = rrConstructor.newInstance(new Object []
+ {split, context, Integer.valueOf(idx)});
+
+ if (idx > 0) {
+ // initialize() for the first RecordReader will be called by MapTask;
+ // we're responsible for initializing subsequent RecordReaders.
+ curReader.initialize(split, context);
+ }
} catch (Exception e) {
throw new RuntimeException (e);
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=926411&r1=926410&r2=926411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Tue Mar 23 00:08:48 2010
@@ -33,13 +33,17 @@ import org.apache.hadoop.hdfs.DFSTestUti
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-public class TestCombineFileInputFormat extends TestCase{
+public class TestCombineFileInputFormat extends TestCase {
private static final String rack1[] = new String[] {
"/r1"
@@ -96,6 +100,147 @@ public class TestCombineFileInputFormat
}
}
+ private static final String DUMMY_KEY = "dummy.rr.key";
+
+ private static class DummyRecordReader extends RecordReader<Text, Text> {
+ private TaskAttemptContext context;
+ private CombineFileSplit s;
+ private int idx;
+ private boolean used;
+
+ public DummyRecordReader(CombineFileSplit split, TaskAttemptContext context,
+ Integer i) {
+ this.context = context;
+ this.idx = i;
+ this.s = split;
+ this.used = true;
+ }
+
+ /** @return a value specified in the context to check whether the
+ * context is properly updated by the initialize() method.
+ */
+ public String getDummyConfVal() {
+ return this.context.getConfiguration().get(DUMMY_KEY);
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ this.context = context;
+ this.s = (CombineFileSplit) split;
+
+ // By setting used to true in the c'tor, but false in initialize,
+ // we can check that initialize() is always called before use
+ // (e.g., in testReinit()).
+ this.used = false;
+ }
+
+ public boolean nextKeyValue() {
+ boolean ret = !used;
+ this.used = true;
+ return ret;
+ }
+
+ public Text getCurrentKey() {
+ return new Text(this.context.getConfiguration().get(DUMMY_KEY));
+ }
+
+ public Text getCurrentValue() {
+ return new Text(this.s.getPath(idx).toString());
+ }
+
+ public float getProgress() {
+ return used ? 1.0f : 0.0f;
+ }
+
+ public void close() {
+ }
+ }
+
+ /** Extend CFIF to use CFRR with DummyRecordReader */
+ private class ChildRRInputFormat extends CombineFileInputFormat<Text, Text> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public RecordReader<Text,Text> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ return new CombineFileRecordReader((CombineFileSplit) split, context,
+ (Class) DummyRecordReader.class);
+ }
+ }
+
+ public void testRecordReaderInit() throws InterruptedException, IOException {
+ // Test that we properly initialize the child recordreader when
+ // CombineFileInputFormat and CombineFileRecordReader are used.
+
+ TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+ Configuration conf1 = new Configuration();
+ conf1.set(DUMMY_KEY, "STATE1");
+ TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId);
+
+ // This will create a CombineFileRecordReader that itself contains a
+ // DummyRecordReader.
+ InputFormat inputFormat = new ChildRRInputFormat();
+
+ Path [] files = { new Path("file1") };
+ long [] lengths = { 1 };
+
+ CombineFileSplit split = new CombineFileSplit(files, lengths);
+
+ RecordReader rr = inputFormat.createRecordReader(split, context1);
+ assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
+
+ // Verify that the initial configuration is the one being used.
+ // Right after construction the dummy key should have value "STATE1"
+ assertEquals("Invalid initial dummy key value", "STATE1",
+ rr.getCurrentKey().toString());
+
+ // Switch the active context for the RecordReader...
+ Configuration conf2 = new Configuration();
+ conf2.set(DUMMY_KEY, "STATE2");
+ TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId);
+ rr.initialize(split, context2);
+
+ // And verify that the new context is updated into the child record reader.
+ assertEquals("Invalid secondary dummy key value", "STATE2",
+ rr.getCurrentKey().toString());
+ }
+
+ public void testReinit() throws Exception {
+ // Test that a split containing multiple files works correctly,
+ // with the child RecordReader getting its initialize() method
+ // called a second time.
+ TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+ Configuration conf = new Configuration();
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
+
+ // This will create a CombineFileRecordReader that itself contains a
+ // DummyRecordReader.
+ InputFormat inputFormat = new ChildRRInputFormat();
+
+ Path [] files = { new Path("file1"), new Path("file2") };
+ long [] lengths = { 1, 1 };
+
+ CombineFileSplit split = new CombineFileSplit(files, lengths);
+ RecordReader rr = inputFormat.createRecordReader(split, context);
+ assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
+
+ // first initialize() call comes from MapTask. We'll do it here.
+ rr.initialize(split, context);
+
+ // First value is first filename.
+ assertTrue(rr.nextKeyValue());
+ assertEquals("file1", rr.getCurrentValue().toString());
+
+ // The inner RR will return false, because it only emits one (k, v) pair.
+ // But there's another sub-split to process. This returns true to us.
+ assertTrue(rr.nextKeyValue());
+
+ // And the 2nd rr will have its initialize method called correctly.
+ assertEquals("file2", rr.getCurrentValue().toString());
+
+ // But after both child RR's have returned their singleton (k, v), this
+ // should also return false.
+ assertFalse(rr.nextKeyValue());
+ }
+
public void testSplitPlacement() throws IOException {
MiniDFSCluster dfs = null;
FileSystem fileSys = null;