You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/11/09 09:01:05 UTC

svn commit: r833993 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/java/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapreduce/lib/input/

Author: sharad
Date: Mon Nov  9 08:01:04 2009
New Revision: 833993

URL: http://svn.apache.org/viewvc?rev=833993&view=rev
Log:
MAPREDUCE-1178. Merge revision 833990 from trunk.

Added:
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=833993&r1=833992&r2=833993&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Mon Nov  9 08:01:04 2009
@@ -803,3 +803,7 @@
 
     MAPREDUCE-1177. Correct setup/cleanup inversion in
     JobTracker::getTaskReports. (Vinod Kumar Vavilapalli via cdouglas)
+
+    MAPREDUCE-1178. Fix ClassCastException in MultipleInputs by adding 
+    a DelegatingRecordReader. (Amareshwari Sriramadasu and Jay Booth 
+    via sharad)

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java?rev=833993&r1=833992&r2=833993&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java Mon Nov  9 08:01:04 2009
@@ -119,17 +119,9 @@
     return splits;
   }
 
-  @SuppressWarnings("unchecked")
+  @Override
   public RecordReader<K, V> createRecordReader(InputSplit split,
       TaskAttemptContext context) throws IOException, InterruptedException {
-
-    // Find the InputFormat and then the RecordReader from the
-    // TaggedInputSplit.
-    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
-    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
-      .newInstance(taggedInputSplit.getInputFormatClass(),
-         context.getConfiguration());
-    return inputFormat.createRecordReader(taggedInputSplit.getInputSplit(),
-      context);
+    return new DelegatingRecordReader<K, V>(split, context);
   }
 }

Added: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java?rev=833993&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java Mon Nov  9 08:01:04 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This is a delegating RecordReader, which delegates the functionality to the
+ * underlying record reader in {@link TaggedInputSplit}  
+ */
+public class DelegatingRecordReader<K, V> extends RecordReader<K, V> {
+  RecordReader<K, V> originalRR;
+
+  /**
+   * Constructs the DelegatingRecordReader.
+   * 
+   * @param split TaggegInputSplit object
+   * @param context TaskAttemptContext object
+   *  
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @SuppressWarnings("unchecked")
+  public DelegatingRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // Find the InputFormat and then the RecordReader from the
+    // TaggedInputSplit.
+    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+        .newInstance(taggedInputSplit.getInputFormatClass(), context
+            .getConfiguration());
+    originalRR = inputFormat.createRecordReader(taggedInputSplit
+        .getInputSplit(), context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    originalRR.close();
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return originalRR.getCurrentKey();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return originalRR.getCurrentValue();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return originalRR.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    originalRR.initialize(((TaggedInputSplit) split).getInputSplit(), context);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return originalRR.nextKeyValue();
+  }
+
+}

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java?rev=833993&r1=833992&r2=833993&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java Mon Nov  9 08:01:04 2009
@@ -17,20 +17,128 @@
  */
 package org.apache.hadoop.mapreduce.lib.input;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Map;
 
-import junit.framework.TestCase;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * @see TestDelegatingInputFormat
  */
-public class TestMultipleInputs extends TestCase {
+public class TestMultipleInputs extends HadoopTestCase {
+
+  public TestMultipleInputs() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  private static final Path ROOT_DIR = new Path("testing/mo");
+  private static final Path IN1_DIR = new Path(ROOT_DIR, "input1");
+  private static final Path IN2_DIR = new Path(ROOT_DIR, "input2");
+  private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
+
+  private Path getDir(Path dir) {
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+          .replace(' ', '+');
+      dir = new Path(localPathRoot, dir);
+    }
+    return dir;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Path rootDir = getDir(ROOT_DIR);
+    Path in1Dir = getDir(IN1_DIR);
+    Path in2Dir = getDir(IN2_DIR);
+
+    Configuration conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(rootDir, true);
+    if (!fs.mkdirs(in1Dir)) {
+      throw new IOException("Mkdirs failed to create " + in1Dir.toString());
+    }
+    if (!fs.mkdirs(in2Dir)) {
+      throw new IOException("Mkdirs failed to create " + in2Dir.toString());
+    }
+  }
+
+  @Test
+  public void testDoMultipleInputs() throws IOException {
+    Path in1Dir = getDir(IN1_DIR);
+    Path in2Dir = getDir(IN2_DIR);
+
+    Path outDir = getDir(OUT_DIR);
+
+    Configuration conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(outDir, true);
+
+    DataOutputStream file1 = fs.create(new Path(in1Dir, "part-0"));
+    file1.writeBytes("a\nb\nc\nd\ne");
+    file1.close();
+
+    // write tab delimited to second file because we're doing
+    // KeyValueInputFormat
+    DataOutputStream file2 = fs.create(new Path(in2Dir, "part-0"));
+    file2.writeBytes("a\tblah\nb\tblah\nc\tblah\nd\tblah\ne\tblah");
+    file2.close();
+
+    Job job = new Job(conf);
+    job.setJobName("mi");
+
+    MultipleInputs.addInputPath(job, in1Dir, TextInputFormat.class,
+        MapClass.class);
+    MultipleInputs.addInputPath(job, in2Dir, KeyValueTextInputFormat.class,
+        KeyValueMapClass.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Text.class);
+    job.setReducerClass(ReducerClass.class);
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    boolean success = false;
+    try {
+      success = job.waitForCompletion(true);
+    } catch (InterruptedException ie) {
+      throw new RuntimeException(ie);
+    } catch (ClassNotFoundException instante) {
+      throw new RuntimeException(instante);
+    }
+    if (!success)
+      throw new RuntimeException("Job failed!");
+
+    // copy bytes a bunch of times for the ease of readLine() - whatever
+    BufferedReader output = new BufferedReader(new InputStreamReader(fs
+        .open(new Path(outDir, "part-r-00000"))));
+    // reducer should have counted one key from each file
+    assertTrue(output.readLine().equals("a 2"));
+    assertTrue(output.readLine().equals("b 2"));
+    assertTrue(output.readLine().equals("c 2"));
+    assertTrue(output.readLine().equals("d 2"));
+    assertTrue(output.readLine().equals("e 2"));
+  }
+
   @SuppressWarnings("unchecked")
   public void testAddInputPathWithFormat() throws IOException {
     final Job conf = new Job();
@@ -50,7 +158,7 @@
     MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
        MapClass.class);
     MultipleInputs.addInputPath(conf, new Path("/bar"),
-       KeyValueTextInputFormat.class, MapClass2.class);
+        KeyValueTextInputFormat.class, KeyValueMapClass.class);
     final Map<Path, InputFormat> inputs = MultipleInputs
        .getInputFormatMap(conf);
     final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
@@ -60,12 +168,42 @@
     assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
        .getClass());
     assertEquals(MapClass.class, maps.get(new Path("/foo")));
-    assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+    assertEquals(KeyValueMapClass.class, maps.get(new Path("/bar")));
   }
 
-  static class MapClass extends Mapper<String, String, String, String> {
+  static final Text blah = new Text("blah");
+
+  // these 3 classes do a reduce side join with 2 different mappers
+  static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
+    // receives "a", "b", "c" as values
+    @Override
+    public void map(LongWritable key, Text value, Context ctx)
+        throws IOException, InterruptedException {
+      ctx.write(value, blah);
+    }
   }
 
-  static class MapClass2 extends MapClass {
+  static class KeyValueMapClass extends Mapper<Text, Text, Text, Text> {
+    // receives "a", "b", "c" as keys
+    @Override
+    public void map(Text key, Text value, Context ctx) throws IOException,
+        InterruptedException {
+      ctx.write(key, blah);
+    }
   }
+
+  static class ReducerClass extends Reducer<Text, Text, NullWritable, Text> {
+    // should receive 2 rows for each key
+    int count = 0;
+
+    @Override
+    public void reduce(Text key, Iterable<Text> values, Context ctx)
+        throws IOException, InterruptedException {
+      count = 0;
+      for (Text value : values)
+        count++;
+      ctx.write(NullWritable.get(), new Text(key.toString() + " " + count));
+    }
+  }
+
 }