You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/03/23 01:08:48 UTC

svn commit: r926411 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

Author: cdouglas
Date: Tue Mar 23 00:08:48 2010
New Revision: 926411

URL: http://svn.apache.org/viewvc?rev=926411&view=rev
Log:
MAPREDUCE-1480. Correctly initialize child RecordReaders in
CombineFileInputFormat. Contributed by Aaron Kimball

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=926411&r1=926410&r2=926411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar 23 00:08:48 2010
@@ -483,6 +483,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1420. Fix TestTTResourceReporting failure. (Scott Chen via
     cdouglas)
 
+    MAPREDUCE-1480. Correctly initialize child RecordReaders in
+    CombineFileInputFormat. (Aaron Kimball via cdouglas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java?rev=926411&r1=926410&r2=926411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java Tue Mar 23 00:08:48 2010
@@ -56,6 +56,9 @@ public class CombineFileRecordReader<K, 
       TaskAttemptContext context) throws IOException, InterruptedException {
     this.split = (CombineFileSplit)split;
     this.context = context;
+    if (null != this.curReader) {
+      this.curReader.initialize(split, context);
+    }
   }
   
   public boolean nextKeyValue() throws IOException, InterruptedException {
@@ -86,8 +89,13 @@ public class CombineFileRecordReader<K, 
   /**
    * return progress based on the amount of data processed so far.
    */
-  public float getProgress() throws IOException {
-    return Math.min(1.0f,  progress/(float)(split.getLength()));
+  public float getProgress() throws IOException, InterruptedException {
+    long subprogress = 0;    // bytes processed in current split
+    if (null != curReader) {
+      // idx is always one past the current subsplit's true index.
+      subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
+    }
+    return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
   }
   
   /**
@@ -135,14 +143,20 @@ public class CombineFileRecordReader<K, 
 
     // get a record reader for the idx-th chunk
     try {
-      curReader =  rrConstructor.newInstance(new Object [] 
-                            {split, context, Integer.valueOf(idx)});
-
       Configuration conf = context.getConfiguration();
       // setup some helper config variables.
       conf.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString());
       conf.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx));
       conf.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx));
+
+      curReader =  rrConstructor.newInstance(new Object [] 
+                            {split, context, Integer.valueOf(idx)});
+
+      if (idx > 0) {
+        // initialize() for the first RecordReader will be called by MapTask;
+        // we're responsible for initializing subsequent RecordReaders.
+        curReader.initialize(split, context);
+      }
     } catch (Exception e) {
       throw new RuntimeException (e);
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=926411&r1=926410&r2=926411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Tue Mar 23 00:08:48 2010
@@ -33,13 +33,17 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
-public class TestCombineFileInputFormat extends TestCase{
+public class TestCombineFileInputFormat extends TestCase {
 
   private static final String rack1[] = new String[] {
     "/r1"
@@ -96,6 +100,147 @@ public class TestCombineFileInputFormat 
     }
   }
 
+  private static final String DUMMY_KEY = "dummy.rr.key";
+
+  private static class DummyRecordReader extends RecordReader<Text, Text> {
+    private TaskAttemptContext context;
+    private CombineFileSplit s;
+    private int idx;
+    private boolean used;
+
+    public DummyRecordReader(CombineFileSplit split, TaskAttemptContext context,
+        Integer i) {
+      this.context = context;
+      this.idx = i;
+      this.s = split;
+      this.used = true;
+    }
+
+    /** @return a value specified in the context to check whether the
+     * context is properly updated by the initialize() method.
+     */
+    public String getDummyConfVal() {
+      return this.context.getConfiguration().get(DUMMY_KEY);
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+      this.context = context;
+      this.s = (CombineFileSplit) split;
+
+      // By setting used to true in the c'tor, but false in initialize,
+      // we can check that initialize() is always called before use
+      // (e.g., in testReinit()).
+      this.used = false;
+    }
+
+    public boolean nextKeyValue() {
+      boolean ret = !used;
+      this.used = true;
+      return ret;
+    }
+
+    public Text getCurrentKey() {
+      return new Text(this.context.getConfiguration().get(DUMMY_KEY));
+    }
+
+    public Text getCurrentValue() {
+      return new Text(this.s.getPath(idx).toString());
+    }
+
+    public float getProgress() {
+      return used ? 1.0f : 0.0f;
+    }
+
+    public void close() {
+    }
+  }
+
+  /** Extend CFIF to use CFRR with DummyRecordReader */
+  private class ChildRRInputFormat extends CombineFileInputFormat<Text, Text> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public RecordReader<Text,Text> createRecordReader(InputSplit split, 
+        TaskAttemptContext context) throws IOException {
+      return new CombineFileRecordReader((CombineFileSplit) split, context,
+          (Class) DummyRecordReader.class);
+    }
+  }
+
+  public void testRecordReaderInit() throws InterruptedException, IOException {
+    // Test that we properly initialize the child recordreader when
+    // CombineFileInputFormat and CombineFileRecordReader are used.
+
+    TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    Configuration conf1 = new Configuration();
+    conf1.set(DUMMY_KEY, "STATE1");
+    TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId);
+
+    // This will create a CombineFileRecordReader that itself contains a
+    // DummyRecordReader.
+    InputFormat inputFormat = new ChildRRInputFormat();
+
+    Path [] files = { new Path("file1") };
+    long [] lengths = { 1 };
+
+    CombineFileSplit split = new CombineFileSplit(files, lengths);
+
+    RecordReader rr = inputFormat.createRecordReader(split, context1);
+    assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
+
+    // Verify that the initial configuration is the one being used.
+    // Right after construction the dummy key should have value "STATE1"
+    assertEquals("Invalid initial dummy key value", "STATE1",
+      rr.getCurrentKey().toString());
+
+    // Switch the active context for the RecordReader...
+    Configuration conf2 = new Configuration();
+    conf2.set(DUMMY_KEY, "STATE2");
+    TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId);
+    rr.initialize(split, context2);
+
+    // And verify that the new context is updated into the child record reader.
+    assertEquals("Invalid secondary dummy key value", "STATE2",
+      rr.getCurrentKey().toString());
+  }
+
+  public void testReinit() throws Exception {
+    // Test that a split containing multiple files works correctly,
+    // with the child RecordReader getting its initialize() method
+    // called a second time.
+    TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    Configuration conf = new Configuration();
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
+
+    // This will create a CombineFileRecordReader that itself contains a
+    // DummyRecordReader.
+    InputFormat inputFormat = new ChildRRInputFormat();
+
+    Path [] files = { new Path("file1"), new Path("file2") };
+    long [] lengths = { 1, 1 };
+
+    CombineFileSplit split = new CombineFileSplit(files, lengths);
+    RecordReader rr = inputFormat.createRecordReader(split, context);
+    assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
+
+    // first initialize() call comes from MapTask. We'll do it here.
+    rr.initialize(split, context);
+
+    // First value is first filename.
+    assertTrue(rr.nextKeyValue());
+    assertEquals("file1", rr.getCurrentValue().toString());
+
+    // The inner RR will return false, because it only emits one (k, v) pair.
+    // But there's another sub-split to process. This returns true to us.
+    assertTrue(rr.nextKeyValue());
+    
+    // And the 2nd rr will have its initialize method called correctly.
+    assertEquals("file2", rr.getCurrentValue().toString());
+    
+    // But after both child RR's have returned their singleton (k, v), this
+    // should also return false.
+    assertFalse(rr.nextKeyValue());
+  }
+
   public void testSplitPlacement() throws IOException {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;