You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/08/24 08:19:23 UTC

svn commit: r807096 [1/2] - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/mapred/join/ src/java/org/apache/hadoop/mapreduce/lib/join/ src/test/ src/test/mapred/org/apache/hadoop/mapreduce/ src/test/m...

Author: cdouglas
Date: Mon Aug 24 06:19:21 2009
New Revision: 807096

URL: http://svn.apache.org/viewvc?rev=807096&view=rev
Log:
MAPREDUCE-355. Update mapred.join package to use the new API. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ResetableIterator.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/StreamBackedIterator.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/TupleWritable.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/package.html
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/package.html
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinTupleWritable.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestTupleWritable.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java
      - copied, changed from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestWrappedRecordReaderClassloader.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 24 06:19:21 2009
@@ -243,6 +243,9 @@
       add mapred.reduce.child.log.level 
     (acmurthy)
 
+    MAPREDUCE-355. Update mapred.join package to use the new API. (Amareshwari
+    Sriramadasu via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java Mon Aug 24 06:19:21 2009
@@ -27,19 +27,23 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.join.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.join.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
- * This is the trivial map/reduce program that does absolutely nothing
- * other than use the framework to fragment and sort the input values.
+ * Given a set of sorted datasets keyed with the same class and yielding
+ * equal partitions, it is possible to effect a join of those datasets 
+ * prior to the map. The example facilitates the same.
  *
  * To run: bin/hadoop jar build/hadoop-examples.jar join
- *            [-m <i>maps</i>] [-r <i>reduces</i>]
+ *            [-r <i>reduces</i>]
  *            [-inFormat <i>input format class</i>] 
  *            [-outFormat <i>output format class</i>] 
  *            [-outKey <i>output key class</i>] 
@@ -50,7 +54,7 @@
 public class Join extends Configured implements Tool {
 
   static int printUsage() {
-    System.out.println("join [-m <maps>] [-r <reduces>] " +
+    System.out.println("join [-r <reduces>] " +
                        "[-inFormat <input format class>] " +
                        "[-outFormat <output format class>] " + 
                        "[-outKey <output key class>] " +
@@ -58,7 +62,7 @@
                        "[-joinOp <inner|outer|override>] " +
                        "[input]* <input> <output>");
     ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
+    return 2;
   }
 
   /**
@@ -67,23 +71,24 @@
    * @throws IOException When there is communication problems with the 
    *                     job tracker.
    */
+  @SuppressWarnings("unchecked")
   public int run(String[] args) throws Exception {
-    JobConf jobConf = new JobConf(getConf(), Sort.class);
-    jobConf.setJobName("join");
-
-    jobConf.setMapperClass(IdentityMapper.class);        
-    jobConf.setReducerClass(IdentityReducer.class);
-
-    JobClient client = new JobClient(jobConf);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int num_maps = cluster.getTaskTrackers() * 
-                   jobConf.getInt("test.sort.maps_per_host", 10);
     int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
-    String sort_reduces = jobConf.get("test.sort.reduces_per_host");
-    if (sort_reduces != null) {
+    String join_reduces = conf.get("mapreduce.join.reduces_per_host");
+    if (join_reduces != null) {
        num_reduces = cluster.getTaskTrackers() * 
-                       Integer.parseInt(sort_reduces);
+                       Integer.parseInt(join_reduces);
     }
+    Job job = new Job(conf);
+    job.setJobName("join");
+    job.setJarByClass(Sort.class);
+
+    job.setMapperClass(Mapper.class);        
+    job.setReducerClass(Reducer.class);
+
     Class<? extends InputFormat> inputFormatClass = 
       SequenceFileInputFormat.class;
     Class<? extends OutputFormat> outputFormatClass = 
@@ -94,9 +99,7 @@
     List<String> otherArgs = new ArrayList<String>();
     for(int i=0; i < args.length; ++i) {
       try {
-        if ("-m".equals(args[i])) {
-          num_maps = Integer.parseInt(args[++i]);
-        } else if ("-r".equals(args[i])) {
+        if ("-r".equals(args[i])) {
           num_reduces = Integer.parseInt(args[++i]);
         } else if ("-inFormat".equals(args[i])) {
           inputFormatClass = 
@@ -126,37 +129,37 @@
     }
 
     // Set user-supplied (possibly default) job configs
-    jobConf.setNumMapTasks(num_maps);
-    jobConf.setNumReduceTasks(num_reduces);
+    job.setNumReduceTasks(num_reduces);
 
     if (otherArgs.size() < 2) {
       System.out.println("ERROR: Wrong number of parameters: ");
       return printUsage();
     }
 
-    FileOutputFormat.setOutputPath(jobConf, 
+    FileOutputFormat.setOutputPath(job, 
       new Path(otherArgs.remove(otherArgs.size() - 1)));
     List<Path> plist = new ArrayList<Path>(otherArgs.size());
     for (String s : otherArgs) {
       plist.add(new Path(s));
     }
 
-    jobConf.setInputFormat(CompositeInputFormat.class);
-    jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
-          op, inputFormatClass, plist.toArray(new Path[0])));
-    jobConf.setOutputFormat(outputFormatClass);
+    job.setInputFormatClass(CompositeInputFormat.class);
+    job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, 
+      CompositeInputFormat.compose(op, inputFormatClass,
+      plist.toArray(new Path[0])));
+    job.setOutputFormatClass(outputFormatClass);
 
-    jobConf.setOutputKeyClass(outputKeyClass);
-    jobConf.setOutputValueClass(outputValueClass);
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
 
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    JobClient.runJob(jobConf);
+    int ret = job.waitForCompletion(true) ? 0 : 1 ;
     Date end_time = new Date();
     System.out.println("Job ended: " + end_time);
     System.out.println("The job took " + 
         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
-    return 0;
+    return ret;
   }
 
   public static void main(String[] args) throws Exception {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java Mon Aug 24 06:19:21 2009
@@ -17,73 +17,28 @@
  */
 package org.apache.hadoop.mapred.join;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * This class provides an implementation of ResetableIterator. The
  * implementation uses an {@link java.util.ArrayList} to store elements
  * added to it, replaying them as requested.
  * Prefer {@link StreamBackedIterator}.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ArrayListBackedIterator} instead
  */
-public class ArrayListBackedIterator<X extends Writable>
+@Deprecated
+public class ArrayListBackedIterator<X extends Writable> extends 
+    org.apache.hadoop.mapreduce.lib.join.ArrayListBackedIterator<X>
     implements ResetableIterator<X> {
 
-  private Iterator<X> iter;
-  private ArrayList<X> data;
-  private X hold = null;
-
   public ArrayListBackedIterator() {
-    this(new ArrayList<X>());
+    super();
   }
 
   public ArrayListBackedIterator(ArrayList<X> data) {
-    this.data = data;
-    this.iter = this.data.iterator();
-  }
-
-  public boolean hasNext() {
-    return iter.hasNext();
-  }
-
-  public boolean next(X val) throws IOException {
-    if (iter.hasNext()) {
-      WritableUtils.cloneInto(val, iter.next());
-      if (null == hold) {
-        hold = WritableUtils.clone(val, null);
-      } else {
-        WritableUtils.cloneInto(hold, val);
-      }
-      return true;
-    }
-    return false;
-  }
-
-  public boolean replay(X val) throws IOException {
-    WritableUtils.cloneInto(val, hold);
-    return true;
-  }
-
-  public void reset() {
-    iter = data.iterator();
-  }
-
-  public void add(X item) throws IOException {
-    data.add(WritableUtils.clone(item, null));
-  }
-
-  public void close() throws IOException {
-    iter = null;
-    data = null;
-  }
-
-  public void clear() {
-    data.clear();
-    reset();
+    super(data);
   }
-
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java Mon Aug 24 06:19:21 2009
@@ -30,7 +30,11 @@
 /**
  * Refinement of InputFormat requiring implementors to provide
  * ComposableRecordReader instead of RecordReader.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ComposableInputFormat} instead
  */
+@Deprecated
 public interface ComposableInputFormat<K extends WritableComparable,
                                        V extends Writable>
     extends InputFormat<K,V> {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java Mon Aug 24 06:19:21 2009
@@ -26,7 +26,10 @@
 
 /**
  * Additional operations required of a RecordReader to participate in a join.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ComposableRecordReader} instead
  */
+@Deprecated
 public interface ComposableRecordReader<K extends WritableComparable,
                                  V extends Writable>
     extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Mon Aug 24 06:19:21 2009
@@ -44,7 +44,10 @@
  * in the join.
  * @see JoinRecordReader
  * @see MultiFilterRecordReader
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat} instead
  */
+@Deprecated
 public class CompositeInputFormat<K extends WritableComparable>
       implements ComposableInputFormat<K,TupleWritable> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java Mon Aug 24 06:19:21 2009
@@ -31,7 +31,11 @@
 /**
  * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
  * into this collection must have a public default constructor.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit} instead
  */
+@Deprecated
 public class CompositeInputSplit implements InputSplit {
 
   private int fill = 0;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Mon Aug 24 06:19:21 2009
@@ -34,7 +34,11 @@
 /**
  * A RecordReader that can effect joins of RecordReaders sharing a common key
  * type and partitioning.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeRecordReader} instead
  */
+@Deprecated
 public abstract class CompositeRecordReader<
     K extends WritableComparable, // key type
     V extends Writable,           // accepts RecordReader<K,V> as children

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -26,7 +26,11 @@
 
 /**
  * Full inner join.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.InnerJoinRecordReader} instead.
  */
+@Deprecated
 public class InnerJoinRecordReader<K extends WritableComparable>
     extends JoinRecordReader<K> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -29,7 +29,10 @@
 
 /**
  * Base class for Composite joins returning Tuples of arbitrary Writables.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.JoinRecordReader} instead
  */
+@Deprecated
 public abstract class JoinRecordReader<K extends WritableComparable>
     extends CompositeRecordReader<K,Writable,TupleWritable>
     implements ComposableRecordReader<K,TupleWritable> {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java Mon Aug 24 06:19:21 2009
@@ -32,7 +32,10 @@
 /**
  * Base class for Composite join returning values derived from multiple
  * sources, but generally not tuples.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.MultiFilterRecordReader} instead
  */
+@Deprecated
 public abstract class MultiFilterRecordReader<K extends WritableComparable,
                                               V extends Writable>
     extends CompositeRecordReader<K,V,V>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -26,7 +26,11 @@
 
 /**
  * Full outer join.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.OuterJoinRecordReader} instead
  */
+@Deprecated
 public class OuterJoinRecordReader<K extends WritableComparable>
     extends JoinRecordReader<K> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java Mon Aug 24 06:19:21 2009
@@ -32,7 +32,10 @@
  * For example, <tt>override(S1,S2,S3)</tt> will prefer values
  * from S3 over S2, and values from S2 over S1 for all keys
  * emitted from all sources.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.OverrideRecordReader} instead
  */
+@Deprecated
 public class OverrideRecordReader<K extends WritableComparable,
                                   V extends Writable>
     extends MultiFilterRecordReader<K,V> {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java Mon Aug 24 06:19:21 2009
@@ -59,7 +59,9 @@
  * straightforward. One need only override the relevant method(s) (usually only
  * {@link CompositeRecordReader#combine}) and include a property to map its
  * value to an identifier in the parser.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.join.Parser} instead
  */
+@Deprecated
 public class Parser {
   public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java Mon Aug 24 06:19:21 2009
@@ -17,77 +17,22 @@
  */
 package org.apache.hadoop.mapred.join;
 
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 
 /**
  * This defines an interface to a stateful Iterator that can replay elements
  * added to it directly.
  * Note that this does not extend {@link java.util.Iterator}.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ResetableIterator} instead
  */
-public interface ResetableIterator<T extends Writable> {
+@Deprecated
+public interface ResetableIterator<T extends Writable> 
+    extends org.apache.hadoop.mapreduce.lib.join.ResetableIterator<T> {
 
   public static class EMPTY<U extends Writable>
-    implements ResetableIterator<U> {
-    public boolean hasNext() { return false; }
-    public void reset() { }
-    public void close() throws IOException { }
-    public void clear() { }
-    public boolean next(U val) throws IOException {
-      return false;
-    }
-    public boolean replay(U val) throws IOException {
-      return false;
-    }
-    public void add(U item) throws IOException {
-      throw new UnsupportedOperationException();
-    }
+      extends org.apache.hadoop.mapreduce.lib.join.ResetableIterator.EMPTY<U>
+      implements ResetableIterator<U> {
   }
-
-  /**
-   * True if a call to next may return a value. This is permitted false
-   * positives, but not false negatives.
-   */
-  public boolean hasNext();
-
-  /**
-   * Assign next value to actual.
-   * It is required that elements added to a ResetableIterator be returned in
-   * the same order after a call to {@link #reset} (FIFO).
-   *
-   * Note that a call to this may fail for nested joins (i.e. more elements
-   * available, but none satisfying the constraints of the join)
-   */
-  public boolean next(T val) throws IOException;
-
-  /**
-   * Assign last value returned to actual.
-   */
-  public boolean replay(T val) throws IOException;
-
-  /**
-   * Set iterator to return to the start of its range. Must be called after
-   * calling {@link #add} to avoid a ConcurrentModificationException.
-   */
-  public void reset();
-
-  /**
-   * Add an element to the collection of elements to iterate over.
-   */
-  public void add(T item) throws IOException;
-
-  /**
-   * Close datasources and release resources. Calling methods on the iterator
-   * after calling close has undefined behavior.
-   */
-  // XXX is this necessary?
-  public void close() throws IOException;
-
-  /**
-   * Close datasources, but do not release internal resources. Calling this
-   * method should permit the object to be reused with a different datasource.
-   */
-  public void clear();
-
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java Mon Aug 24 06:19:21 2009
@@ -17,83 +17,17 @@
  */
 package org.apache.hadoop.mapred.join;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 
 /**
  * This class provides an implementation of ResetableIterator. This
  * implementation uses a byte array to store elements added to it.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.StreamBackedIterator} instead
  */
+@Deprecated
 public class StreamBackedIterator<X extends Writable>
+    extends org.apache.hadoop.mapreduce.lib.join.StreamBackedIterator<X>
     implements ResetableIterator<X> {
-
-  private static class ReplayableByteInputStream extends ByteArrayInputStream {
-    public ReplayableByteInputStream(byte[] arr) {
-      super(arr);
-    }
-    public void resetStream() {
-      mark = 0;
-      reset();
-    }
-  }
-
-  private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
-  private DataOutputStream outfbuf = new DataOutputStream(outbuf);
-  private ReplayableByteInputStream inbuf;
-  private DataInputStream infbuf;
-
-  public StreamBackedIterator() { }
-
-  public boolean hasNext() {
-    return infbuf != null && inbuf.available() > 0;
-  }
-
-  public boolean next(X val) throws IOException {
-    if (hasNext()) {
-      inbuf.mark(0);
-      val.readFields(infbuf);
-      return true;
-    }
-    return false;
-  }
-
-  public boolean replay(X val) throws IOException {
-    inbuf.reset();
-    if (0 == inbuf.available())
-      return false;
-    val.readFields(infbuf);
-    return true;
-  }
-
-  public void reset() {
-    if (null != outfbuf) {
-      inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
-      infbuf =  new DataInputStream(inbuf);
-      outfbuf = null;
-    }
-    inbuf.resetStream();
-  }
-
-  public void add(X item) throws IOException {
-    item.write(outfbuf);
-  }
-
-  public void close() throws IOException {
-    if (null != infbuf)
-      infbuf.close();
-    if (null != outfbuf)
-      outfbuf.close();
-  }
-
-  public void clear() {
-    if (null != inbuf)
-      inbuf.resetStream();
-    outbuf.reset();
-    outfbuf = new DataOutputStream(outbuf);
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java Mon Aug 24 06:19:21 2009
@@ -18,16 +18,7 @@
 
 package org.apache.hadoop.mapred.join;
 
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
@@ -40,17 +31,19 @@
  * incompatible with, but contrary to the general case.
  *
  * @see org.apache.hadoop.io.Writable
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.TupleWritable} instead
  */
-public class TupleWritable implements Writable, Iterable<Writable> {
-
-  private BitSet written;
-  private Writable[] values;
+@Deprecated
+public class TupleWritable 
+    extends org.apache.hadoop.mapreduce.lib.join.TupleWritable {
 
   /**
    * Create an empty tuple with no allocated storage for writables.
    */
   public TupleWritable() {
-    written = new BitSet(0);
+    super();
   }
 
   /**
@@ -58,148 +51,7 @@
    * &quot;written&quot; values.
    */
   public TupleWritable(Writable[] vals) {
-    written = new BitSet(vals.length);
-    values = vals;
-  }
-
-  /**
-   * Return true if tuple has an element at the position provided.
-   */
-  public boolean has(int i) {
-    return written.get(i);
-  }
-
-  /**
-   * Get ith Writable from Tuple.
-   */
-  public Writable get(int i) {
-    return values[i];
-  }
-
-  /**
-   * The number of children in this Tuple.
-   */
-  public int size() {
-    return values.length;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public boolean equals(Object other) {
-    if (other instanceof TupleWritable) {
-      TupleWritable that = (TupleWritable)other;
-      if (!this.written.equals(that.written)) {
-        return false;
-      }
-      for (int i = 0; i < values.length; ++i) {
-        if (!has(i)) continue;
-        if (!values[i].equals(that.get(i))) {
-          return false;
-        }
-      }
-      return true;
-    }
-    return false;
-  }
-
-  public int hashCode() {
-    assert false : "hashCode not designed";
-    return written.hashCode();
-  }
-
-  /**
-   * Return an iterator over the elements in this tuple.
-   * Note that this doesn't flatten the tuple; one may receive tuples
-   * from this iterator.
-   */
-  public Iterator<Writable> iterator() {
-    final TupleWritable t = this;
-    return new Iterator<Writable>() {
-      int bitIndex = written.nextSetBit(0);
-      public boolean hasNext() {
-        return bitIndex >= 0;
-      }
-      public Writable next() {
-        int returnIndex = bitIndex;
-        if (returnIndex < 0)
-          throw new NoSuchElementException();
-        bitIndex = written.nextSetBit(bitIndex+1);
-        return t.get(returnIndex);
-      }
-      public void remove() {
-        if (!written.get(bitIndex)) {
-          throw new IllegalStateException("Attempt to remove non-existent val");
-        }
-        written.clear(bitIndex);
-      }
-    };
-  }
-
-  /**
-   * Convert Tuple to String as in the following.
-   * <tt>[<child1>,<child2>,...,<childn>]</tt>
-   */
-  public String toString() {
-    StringBuffer buf = new StringBuffer("[");
-    for (int i = 0; i < values.length; ++i) {
-      buf.append(has(i) ? values[i].toString() : "");
-      buf.append(",");
-    }
-    if (values.length != 0)
-      buf.setCharAt(buf.length() - 1, ']');
-    else
-      buf.append(']');
-    return buf.toString();
-  }
-
-  // Writable
-
-  /** Writes each Writable to <code>out</code>.
-   * TupleWritable format:
-   * {@code
-   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
-   * }
-   */
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, values.length);
-    writeBitSet(out, values.length, written);
-    for (int i = 0; i < values.length; ++i) {
-      Text.writeString(out, values[i].getClass().getName());
-    }
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
-        values[i].write(out);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @SuppressWarnings("unchecked") // No static typeinfo on Tuples
-  public void readFields(DataInput in) throws IOException {
-    int card = WritableUtils.readVInt(in);
-    values = new Writable[card];
-    readBitSet(in, card, written);
-    Class<? extends Writable>[] cls = new Class[card];
-    try {
-      for (int i = 0; i < card; ++i) {
-        cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
-      }
-      for (int i = 0; i < card; ++i) {
-          values[i] = cls[i].newInstance();
-        if (has(i)) {
-          values[i].readFields(in);
-        }
-      }
-    } catch (ClassNotFoundException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
-    } catch (IllegalAccessException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
-    }
+    super(vals);
   }
 
   /**
@@ -225,64 +77,5 @@
     written.clear();
   }
 
-  /**
-   * Writes the bit set to the stream. The first 64 bit-positions of the bit set
-   * are written as a VLong for backwards-compatibility with older versions of
-   * TupleWritable. All bit-positions >= 64 are encoded as a byte for every 8
-   * bit-positions.
-   */
-  private static final void writeBitSet(DataOutput stream, int nbits, BitSet bitSet)
-      throws IOException {
-    long bits = 0L;
-        
-    int bitSetIndex = bitSet.nextSetBit(0);
-    for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
-            bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
-      bits |= 1L << bitSetIndex;
-    }
-    WritableUtils.writeVLong(stream,bits);
-    
-    if (nbits > Long.SIZE) {
-      bits = 0L;
-      for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits; 
-              bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
-        int bitsIndex = bitSetIndex % Byte.SIZE;
-        int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
-        if (word > lastWordWritten) {
-          stream.writeByte((byte)bits);
-          bits = 0L;
-          for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
-            stream.writeByte((byte)bits);
-          }
-        }
-        bits |= 1L << bitsIndex;
-      }
-      stream.writeByte((byte)bits);
-    }
-  }
 
-  /**
-   * Reads a bitset from the stream that has been written with
-   * {@link #writeBitSet(DataOutput, int, BitSet)}.
-   */
-  private static final void readBitSet(DataInput stream, int nbits, 
-      BitSet bitSet) throws IOException {
-    bitSet.clear();
-    long initialBits = WritableUtils.readVLong(stream);
-    long last = 0L;
-    while (0L != initialBits) {
-      last = Long.lowestOneBit(initialBits);
-      initialBits ^= last;
-      bitSet.set(Long.numberOfTrailingZeros(last));
-    }
-    
-    for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
-      byte bits = stream.readByte();
-      while (0 != bits) {
-        last = Long.lowestOneBit(bits);
-        bits ^= last;
-        bitSet.set(Long.numberOfTrailingZeros(last) + offset);
-      }
-    }
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java Mon Aug 24 06:19:21 2009
@@ -31,7 +31,10 @@
  * This class keeps track of the &quot;head&quot; key-value pair for the
  * provided RecordReader and keeps a store of values matching a key when
  * this source is participating in a join.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.WrappedRecordReader} instead
  */
+@Deprecated
 public class WrappedRecordReader<K extends WritableComparable,
                           U extends Writable>
     implements ComposableRecordReader<K,U> {

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java Mon Aug 24 06:19:21 2009
@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * This class provides an implementation of ResetableIterator. The
@@ -36,6 +38,7 @@
   private Iterator<X> iter;
   private ArrayList<X> data;
   private X hold = null;
+  private Configuration conf = new Configuration();
 
   public ArrayListBackedIterator() {
     this(new ArrayList<X>());
@@ -52,11 +55,11 @@
 
   public boolean next(X val) throws IOException {
     if (iter.hasNext()) {
-      WritableUtils.cloneInto(val, iter.next());
+      ReflectionUtils.copy(conf, iter.next(), val);
       if (null == hold) {
         hold = WritableUtils.clone(val, null);
       } else {
-        WritableUtils.cloneInto(hold, val);
+        ReflectionUtils.copy(conf, val, hold);
       }
       return true;
     }
@@ -64,7 +67,7 @@
   }
 
   public boolean replay(X val) throws IOException {
-    WritableUtils.cloneInto(val, hold);
+    ReflectionUtils.copy(conf, hold, val);
     return true;
   }
 
@@ -85,5 +88,4 @@
     data.clear();
     reset();
   }
-
 }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java Mon Aug 24 06:19:21 2009
@@ -16,25 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
  * Refinement of InputFormat requiring implementors to provide
  * ComposableRecordReader instead of RecordReader.
  */
-public interface ComposableInputFormat<K extends WritableComparable,
-                                       V extends Writable>
+public abstract class ComposableInputFormat<K extends WritableComparable<?>,
+                                            V extends Writable>
     extends InputFormat<K,V> {
 
-  ComposableRecordReader<K,V> getRecordReader(InputSplit split,
-      JobConf job, Reporter reporter) throws IOException;
+  public abstract ComposableRecordReader<K,V> createRecordReader(
+    InputSplit split, TaskAttemptContext context) 
+    throws IOException, InterruptedException;
+
 }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,50 +16,63 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.RecordReader;
 
 /**
  * Additional operations required of a RecordReader to participate in a join.
  */
-public interface ComposableRecordReader<K extends WritableComparable,
-                                 V extends Writable>
-    extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {
+public abstract class ComposableRecordReader<K extends WritableComparable<?>,
+                                             V extends Writable>
+    extends RecordReader<K,V>
+    implements Comparable<ComposableRecordReader<K,?>> {
 
   /**
    * Return the position in the collector this class occupies.
    */
-  int id();
+  abstract int id();
 
   /**
    * Return the key this RecordReader would supply on a call to next(K,V)
    */
-  K key();
+  abstract K key();
 
   /**
    * Clone the key at the head of this RecordReader into the object provided.
    */
-  void key(K key) throws IOException;
+  abstract void key(K key) throws IOException;
 
   /**
+   * Create instance of key.
+   */
+  abstract K createKey();
+  
+  /**
+   * Create instance of value.
+   */
+  abstract V createValue();
+  
+  /**
    * Returns true if the stream is not empty, but provides no guarantee that
    * a call to next(K,V) will succeed.
    */
-  boolean hasNext();
+  abstract boolean hasNext();
 
   /**
    * Skip key-value pairs with keys less than or equal to the key provided.
    */
-  void skip(K key) throws IOException;
+  abstract void skip(K key) throws IOException, InterruptedException;
 
   /**
    * While key-value pairs from this RecordReader match the given key, register
    * them with the JoinCollector provided.
    */
-  void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException;
+  @SuppressWarnings("unchecked")
+  abstract void accept(CompositeRecordReader.JoinCollector jc, K key) 
+      throws IOException, InterruptedException;
 }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java Mon Aug 24 06:19:21 2009
@@ -16,20 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
  * An InputFormat capable of performing joins over a set of data sources sorted
@@ -37,17 +40,21 @@
  * @see #setFormat
  *
  * A user may define new join types by setting the property
- * <tt>mapred.join.define.&lt;ident&gt;</tt> to a classname. In the expression
- * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
- * ComposableRecordReader.
- * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
- * in the join.
+ * <tt>mapreduce.join.define.&lt;ident&gt;</tt> to a classname. 
+ * In the expression <tt>mapreduce.join.expr</tt>, the identifier will be
+ * assumed to be a ComposableRecordReader.
+ * <tt>mapreduce.join.keycomparator</tt> can be a classname used to compare 
+ * keys in the join.
  * @see JoinRecordReader
  * @see MultiFilterRecordReader
  */
+@SuppressWarnings("unchecked")
 public class CompositeInputFormat<K extends WritableComparable>
-      implements ComposableInputFormat<K,TupleWritable> {
+    extends InputFormat<K, TupleWritable> {
 
+  public static final String JOIN_EXPR = "mapreduce.join.expr";
+  public static final String JOIN_COMPARATOR = "mapreduce.join.keycomparator";
+  
   // expression parse tree to which IF requests are proxied
   private Parser.Node root;
 
@@ -62,16 +69,16 @@
    *   class ::= @see java.lang.Class#forName(java.lang.String)
    *   path  ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
    * }
-   * Reads expression from the <tt>mapred.join.expr</tt> property and
-   * user-supplied join types from <tt>mapred.join.define.&lt;ident&gt;</tt>
+   * Reads expression from the <tt>mapreduce.join.expr</tt> property and
+   * user-supplied join types from <tt>mapreduce.join.define.&lt;ident&gt;</tt>
    *  types. Paths supplied to <tt>tbl</tt> are given as input paths to the
    * InputFormat class listed.
    * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
    */
-  public void setFormat(JobConf job) throws IOException {
+  public void setFormat(Configuration conf) throws IOException {
     addDefaults();
-    addUserIdentifiers(job);
-    root = Parser.parse(job.get("mapred.join.expr", null), job);
+    addUserIdentifiers(conf);
+    root = Parser.parse(conf.get(JOIN_EXPR, null), conf);
   }
 
   /**
@@ -91,17 +98,16 @@
   /**
    * Inform the parser of user-defined types.
    */
-  private void addUserIdentifiers(JobConf job) throws IOException {
-    Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$");
-    for (Map.Entry<String,String> kv : job) {
+  private void addUserIdentifiers(Configuration conf) throws IOException {
+    Pattern x = Pattern.compile("^mapreduce\\.join\\.define\\.(\\w+)$");
+    for (Map.Entry<String,String> kv : conf) {
       Matcher m = x.matcher(kv.getKey());
       if (m.matches()) {
         try {
           Parser.CNode.addIdentifier(m.group(1),
-              job.getClass(m.group(0), null, ComposableRecordReader.class));
+              conf.getClass(m.group(0), null, ComposableRecordReader.class));
         } catch (NoSuchMethodException e) {
-          throw (IOException)new IOException(
-              "Invalid define for " + m.group(1)).initCause(e);
+          throw new IOException("Invalid define for " + m.group(1), e);
         }
       }
     }
@@ -111,10 +117,12 @@
    * Build a CompositeInputSplit from the child InputFormats by assigning the
    * ith split from each child to the ith composite split.
    */
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    setFormat(job);
-    job.setLong("mapred.min.split.size", Long.MAX_VALUE);
-    return root.getSplits(job, numSplits);
+  @SuppressWarnings("unchecked")
+  public List<InputSplit> getSplits(JobContext job) 
+      throws IOException, InterruptedException {
+    setFormat(job.getConfiguration());
+    job.getConfiguration().setLong("mapred.min.split.size", Long.MAX_VALUE);
+    return root.getSplits(job);
   }
 
   /**
@@ -124,10 +132,11 @@
    * Mandating TupleWritable isn't strictly correct.
    */
   @SuppressWarnings("unchecked") // child types unknown
-  public ComposableRecordReader<K,TupleWritable> getRecordReader(
-      InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    setFormat(job);
-    return root.getRecordReader(split, job, reporter);
+  public RecordReader<K,TupleWritable> createRecordReader(InputSplit split, 
+      TaskAttemptContext taskContext) 
+      throws IOException, InterruptedException {
+    setFormat(taskContext.getConfiguration());
+    return root.createRecordReader(split, taskContext);
   }
 
   /**
@@ -135,8 +144,10 @@
    * Given InputFormat class (inf), path (p) return:
    * {@code tbl(<inf>, <p>) }
    */
-  public static String compose(Class<? extends InputFormat> inf, String path) {
-    return compose(inf.getName().intern(), path, new StringBuffer()).toString();
+  public static String compose(Class<? extends InputFormat> inf, 
+      String path) {
+    return compose(inf.getName().intern(), path, 
+             new StringBuffer()).toString();
   }
 
   /**
@@ -144,8 +155,8 @@
    * Given operation (op), Object class (inf), set of paths (p) return:
    * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
    */
-  public static String compose(String op, Class<? extends InputFormat> inf,
-      String... path) {
+  public static String compose(String op, 
+      Class<? extends InputFormat> inf, String... path) {
     final String infname = inf.getName();
     StringBuffer ret = new StringBuffer(op + '(');
     for (String p : path) {
@@ -161,8 +172,8 @@
    * Given operation (op), Object class (inf), set of paths (p) return:
    * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
    */
-  public static String compose(String op, Class<? extends InputFormat> inf,
-      Path... path) {
+  public static String compose(String op, 
+      Class<? extends InputFormat> inf, Path... path) {
     ArrayList<String> tmp = new ArrayList<String>(path.length);
     for (Path p : path) {
       tmp.add(p.toString());
@@ -177,5 +188,4 @@
     sb.append("\")");
     return sb;
   }
-
 }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java Mon Aug 24 06:19:21 2009
@@ -16,27 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.HashSet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
  * into this collection must have a public default constructor.
  */
-public class CompositeInputSplit implements InputSplit {
+public class CompositeInputSplit extends InputSplit implements Writable {
 
   private int fill = 0;
   private long totsize = 0L;
   private InputSplit[] splits;
+  private Configuration conf = new Configuration();
 
   public CompositeInputSplit() { }
 
@@ -49,7 +55,7 @@
    * @throws IOException If capacity was not specified during construction
    *                     or if capacity has been reached.
    */
-  public void add(InputSplit s) throws IOException {
+  public void add(InputSplit s) throws IOException, InterruptedException {
     if (null == splits) {
       throw new IOException("Uninitialized InputSplit");
     }
@@ -77,14 +83,14 @@
   /**
    * Get the length of ith child InputSplit.
    */
-  public long getLength(int i) throws IOException {
+  public long getLength(int i) throws IOException, InterruptedException {
     return splits[i].getLength();
   }
 
   /**
    * Collect a set of hosts from all child InputSplits.
    */
-  public String[] getLocations() throws IOException {
+  public String[] getLocations() throws IOException, InterruptedException {
     HashSet<String> hosts = new HashSet<String>();
     for (InputSplit s : splits) {
       String[] hints = s.getLocations();
@@ -100,7 +106,7 @@
   /**
    * getLocations from ith InputSplit.
    */
-  public String[] getLocation(int i) throws IOException {
+  public String[] getLocation(int i) throws IOException, InterruptedException {
     return splits[i].getLocations();
   }
 
@@ -110,20 +116,26 @@
    * <count><class1><class2>...<classn><split1><split2>...<splitn>
    * }
    */
+  @SuppressWarnings("unchecked")
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, splits.length);
     for (InputSplit s : splits) {
       Text.writeString(out, s.getClass().getName());
     }
     for (InputSplit s : splits) {
-      s.write(out);
+      SerializationFactory factory = new SerializationFactory(conf);
+      Serializer serializer = 
+        factory.getSerializer(s.getClass());
+      serializer.open((DataOutputStream)out);
+      serializer.serialize(s);
+      serializer.close();
     }
   }
 
   /**
    * {@inheritDoc}
    * @throws IOException If the child InputSplit cannot be read, typically
-   *                     for faliing access checks.
+   *                     for failing access checks.
    */
   @SuppressWarnings("unchecked")  // Generic array assignment
   public void readFields(DataInput in) throws IOException {
@@ -139,11 +151,14 @@
       }
       for (int i = 0; i < card; ++i) {
         splits[i] = ReflectionUtils.newInstance(cls[i], null);
-        splits[i].readFields(in);
+        SerializationFactory factory = new SerializationFactory(conf);
+        Deserializer deserializer = factory.getDeserializer(cls[i]);
+        deserializer.open((DataInputStream)in);
+        splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
+        deserializer.close();
       }
     } catch (ClassNotFoundException e) {
-      throw (IOException)new IOException("Failed split init").initCause(e);
+      throw new IOException("Failed split init", e);
     }
   }
-
 }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java Mon Aug 24 06:19:21 2009
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -24,11 +24,13 @@
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -36,24 +38,28 @@
  * type and partitioning.
  */
 public abstract class CompositeRecordReader<
-    K extends WritableComparable, // key type
-    V extends Writable,           // accepts RecordReader<K,V> as children
-    X extends Writable>           // emits Writables of this type
+    K extends WritableComparable<?>, // key type
+    V extends Writable,  // accepts RecordReader<K,V> as children
+    X extends Writable>  // emits Writables of this type
+    extends ComposableRecordReader<K, X>
     implements Configurable {
 
-
   private int id;
-  private Configuration conf;
+  protected Configuration conf;
   private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
 
   private WritableComparator cmp;
-  private Class<? extends WritableComparable> keyclass;
+  @SuppressWarnings("unchecked")
+  protected Class<? extends WritableComparable> keyclass = null;
   private PriorityQueue<ComposableRecordReader<K,?>> q;
 
   protected final JoinCollector jc;
   protected final ComposableRecordReader<K,? extends V>[] kids;
 
   protected abstract boolean combine(Object[] srcs, TupleWritable value);
+  
+  protected K key;
+  protected X value;
 
   /**
    * Create a RecordReader with <tt>capacity</tt> children to position
@@ -70,17 +76,56 @@
     if (null != cmpcl) {
       cmp = ReflectionUtils.newInstance(cmpcl, null);
       q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
-          new Comparator<ComposableRecordReader<K,?>>() {
-            public int compare(ComposableRecordReader<K,?> o1,
-                               ComposableRecordReader<K,?> o2) {
-              return cmp.compare(o1.key(), o2.key());
-            }
-          });
+            new Comparator<ComposableRecordReader<K,?>>() {
+              public int compare(ComposableRecordReader<K,?> o1,
+                                 ComposableRecordReader<K,?> o2) {
+                return cmp.compare(o1.key(), o2.key());
+              }
+            });
     }
     jc = new JoinCollector(capacity);
     kids = new ComposableRecordReader[capacity];
   }
 
+  @SuppressWarnings("unchecked")
+  public void initialize(InputSplit split, TaskAttemptContext context) 
+      throws IOException, InterruptedException {
+    if (kids != null) {
+      for (int i = 0; i < kids.length; ++i) {
+        kids[i].initialize(((CompositeInputSplit)split).get(i), context);
+        if (kids[i].key() == null) {
+          continue;
+        }
+        
+        // get keyclass
+        if (keyclass == null) {
+          keyclass = kids[i].createKey().getClass().
+            asSubclass(WritableComparable.class);
+        }
+        // create priority queue
+        if (null == q) {
+          cmp = WritableComparator.get(keyclass);
+          q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+                new Comparator<ComposableRecordReader<K,?>>() {
+                  public int compare(ComposableRecordReader<K,?> o1,
+                                     ComposableRecordReader<K,?> o2) {
+                    return cmp.compare(o1.key(), o2.key());
+                  }
+                });
+        }
+        // Explicit check for key class agreement
+        if (!keyclass.equals(kids[i].key().getClass())) {
+          throw new ClassCastException("Child key classes fail to agree");
+        }
+        
+        // add the kid to priority queue if it has any elements
+        if (kids[i].hasNext()) {
+          q.add(kids[i]);
+        }
+      }
+    }
+  }
+
   /**
    * Return the position in the collector this class occupies.
    */
@@ -123,21 +168,9 @@
    * entry will appear. Adding RecordReaders with the same id has
    * undefined behavior.
    */
-  public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
+  public void add(ComposableRecordReader<K,? extends V> rr) 
+      throws IOException, InterruptedException {
     kids[rr.id()] = rr;
-    if (null == q) {
-      cmp = WritableComparator.get(rr.createKey().getClass());
-      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
-          new Comparator<ComposableRecordReader<K,?>>() {
-            public int compare(ComposableRecordReader<K,?> o1,
-                               ComposableRecordReader<K,?> o2) {
-              return cmp.compare(o1.key(), o2.key());
-            }
-          });
-    }
-    if (rr.hasNext()) {
-      q.add(rr);
-    }
   }
 
   /**
@@ -146,7 +179,7 @@
    * one or more child RR contain duplicate keys, this will emit the cross
    * product of the associated values until exhausted.
    */
-  class JoinCollector {
+  public class JoinCollector {
     private K key;
     private ResetableIterator<X>[] iters;
     private int pos = -1;
@@ -208,7 +241,7 @@
     /**
      * Returns false if exhausted or if reset(K) has not been called.
      */
-    protected boolean hasNext() {
+    public boolean hasNext() {
       return !(pos < 0);
     }
 
@@ -218,7 +251,7 @@
      * sources s_1...s_n sharing key k, repeated calls to next should yield
      * I x I.
      */
-    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+    @SuppressWarnings("unchecked") // No static type info on Tuples
     protected boolean next(TupleWritable val) throws IOException {
       if (first) {
         int i = -1;
@@ -321,9 +354,13 @@
    * Clone the key at the top of this RR into the given object.
    */
   public void key(K key) throws IOException {
-    WritableUtils.cloneInto(key, key());
+    ReflectionUtils.copy(conf, key(), key);
   }
 
+  public K getCurrentKey() {
+    return key;
+  }
+  
   /**
    * Return true if it is possible that this could emit more values.
    */
@@ -334,7 +371,7 @@
   /**
    * Pass skip key to child RRs.
    */
-  public void skip(K key) throws IOException {
+  public void skip(K key) throws IOException, InterruptedException {
     ArrayList<ComposableRecordReader<K,?>> tmp =
       new ArrayList<ComposableRecordReader<K,?>>();
     while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
@@ -359,8 +396,9 @@
    * iterator over values it may emit.
    */
   @SuppressWarnings("unchecked") // No values from static EMPTY class
+  @Override
   public void accept(CompositeRecordReader.JoinCollector jc, K key)
-      throws IOException {
+      throws IOException, InterruptedException {
     if (hasNext() && 0 == cmp.compare(key, key())) {
       fillJoinCollector(createKey());
       jc.add(id, getDelegate());
@@ -373,7 +411,8 @@
    * For all child RRs offering the key provided, obtain an iterator
    * at that position in the JoinCollector.
    */
-  protected void fillJoinCollector(K iterkey) throws IOException {
+  protected void fillJoinCollector(K iterkey) 
+      throws IOException, InterruptedException {
     if (!q.isEmpty()) {
       q.peek().key(iterkey);
       while (0 == cmp.compare(q.peek().key(), iterkey)) {
@@ -397,19 +436,13 @@
   }
 
   /**
-   * Create a new key value common to all child RRs.
+   * Create a new key common to all child RRs.
    * @throws ClassCastException if key classes differ.
    */
-  @SuppressWarnings("unchecked") // Explicit check for key class agreement
-  public K createKey() {
-    if (null == keyclass) {
-      final Class<?> cls = kids[0].createKey().getClass();
-      for (RecordReader<K,? extends Writable> rr : kids) {
-        if (!cls.equals(rr.createKey().getClass())) {
-          throw new ClassCastException("Child key classes fail to agree");
-        }
-      }
-      keyclass = cls.asSubclass(WritableComparable.class);
+  @SuppressWarnings("unchecked")
+  protected K createKey() {
+    if (keyclass == null || keyclass.equals(NullWritable.class)) {
+      return (K) NullWritable.get();
     }
     return (K) ReflectionUtils.newInstance(keyclass, getConf());
   }
@@ -417,7 +450,7 @@
   /**
    * Create a value to be used internally for joins.
    */
-  protected TupleWritable createInternalValue() {
+  protected TupleWritable createTupleWritable() {
     Writable[] vals = new Writable[kids.length];
     for (int i = 0; i < vals.length; ++i) {
       vals[i] = kids[i].createValue();
@@ -425,11 +458,10 @@
     return new TupleWritable(vals);
   }
 
-  /**
-   * Unsupported (returns zero in all cases).
-   */
-  public long getPos() throws IOException {
-    return 0;
+  /** {@inheritDoc} */
+  public X getCurrentValue() 
+      throws IOException, InterruptedException {
+    return value;
   }
 
   /**
@@ -449,11 +481,12 @@
   /**
    * Report progress as the minimum of all child RR progress.
    */
-  public float getProgress() throws IOException {
+  public float getProgress() throws IOException, InterruptedException {
     float ret = 1.0f;
     for (RecordReader<K,? extends Writable> rr : kids) {
       ret = Math.min(ret, rr.getProgress());
     }
     return ret;
   }
+  
 }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,21 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Full inner join.
  */
-public class InnerJoinRecordReader<K extends WritableComparable>
+public class InnerJoinRecordReader<K extends WritableComparable<?>>
     extends JoinRecordReader<K> {
 
-  InnerJoinRecordReader(int id, JobConf conf, int capacity,
+  InnerJoinRecordReader(int id, Configuration conf, int capacity,
       Class<? extends WritableComparator> cmpcl) throws IOException {
     super(id, conf, capacity, cmpcl);
   }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,25 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 import java.util.PriorityQueue;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Base class for Composite joins returning Tuples of arbitrary Writables.
  */
-public abstract class JoinRecordReader<K extends WritableComparable>
-    extends CompositeRecordReader<K,Writable,TupleWritable>
-    implements ComposableRecordReader<K,TupleWritable> {
+public abstract class JoinRecordReader<K extends WritableComparable<?>>
+    extends CompositeRecordReader<K,Writable,TupleWritable> {
 
-  public JoinRecordReader(int id, JobConf conf, int capacity,
+  public JoinRecordReader(int id, Configuration conf, int capacity,
       Class<? extends WritableComparator> cmpcl) throws IOException {
     super(id, capacity, cmpcl);
     setConf(conf);
@@ -44,19 +43,27 @@
    * Emit the next set of key, value pairs as defined by the child
    * RecordReaders and operation associated with this composite RR.
    */
-  public boolean next(K key, TupleWritable value) throws IOException {
+  public boolean nextKeyValue() 
+      throws IOException, InterruptedException {
+    if (key == null) {
+      key = createKey();
+    }
     if (jc.flush(value)) {
-      WritableUtils.cloneInto(key, jc.key());
+      ReflectionUtils.copy(conf, jc.key(), key);
       return true;
     }
     jc.clear();
+    if (value == null) {
+      value = createValue();
+    }
+    final PriorityQueue<ComposableRecordReader<K,?>> q = 
+            getRecordReaderQueue();
     K iterkey = createKey();
-    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
-    while (!q.isEmpty()) {
+    while (q != null && !q.isEmpty()) {
       fillJoinCollector(iterkey);
       jc.reset(iterkey);
       if (jc.flush(value)) {
-        WritableUtils.cloneInto(key, jc.key());
+        ReflectionUtils.copy(conf, jc.key(), key);
         return true;
       }
       jc.clear();
@@ -64,9 +71,8 @@
     return false;
   }
 
-  /** {@inheritDoc} */
   public TupleWritable createValue() {
-    return createInternalValue();
+    return createTupleWritable();
   }
 
   /**

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,32 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 import java.util.PriorityQueue;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
 
 /**
  * Base class for Composite join returning values derived from multiple
  * sources, but generally not tuples.
  */
-public abstract class MultiFilterRecordReader<K extends WritableComparable,
+public abstract class MultiFilterRecordReader<K extends WritableComparable<?>,
                                               V extends Writable>
-    extends CompositeRecordReader<K,V,V>
-    implements ComposableRecordReader<K,V> {
+    extends CompositeRecordReader<K,V,V> {
 
-  private Class<? extends Writable> valueclass;
-  private TupleWritable ivalue;
+  private TupleWritable ivalue = null;
 
-  public MultiFilterRecordReader(int id, JobConf conf, int capacity,
+  public MultiFilterRecordReader(int id, Configuration conf, int capacity,
       Class<? extends WritableComparator> cmpcl) throws IOException {
     super(id, capacity, cmpcl);
     setConf(conf);
@@ -65,21 +63,31 @@
   }
 
   /** {@inheritDoc} */
-  public boolean next(K key, V value) throws IOException {
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (key == null) {
+      key = createKey();
+    }
+    if (value == null) {
+      value = createValue();
+    }
     if (jc.flush(ivalue)) {
-      WritableUtils.cloneInto(key, jc.key());
-      WritableUtils.cloneInto(value, emit(ivalue));
+      ReflectionUtils.copy(conf, jc.key(), key);
+      ReflectionUtils.copy(conf, emit(ivalue), value);
       return true;
     }
+    if (ivalue == null) {
+      ivalue = createTupleWritable();
+    }
     jc.clear();
+    final PriorityQueue<ComposableRecordReader<K,?>> q = 
+            getRecordReaderQueue();
     K iterkey = createKey();
-    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
-    while (!q.isEmpty()) {
+    while (q != null && !q.isEmpty()) {
       fillJoinCollector(iterkey);
       jc.reset(iterkey);
       if (jc.flush(ivalue)) {
-        WritableUtils.cloneInto(key, jc.key());
-        WritableUtils.cloneInto(value, emit(ivalue));
+        ReflectionUtils.copy(conf, jc.key(), key);
+        ReflectionUtils.copy(conf, emit(ivalue), value);
         return true;
       }
       jc.clear();
@@ -87,20 +95,10 @@
     return false;
   }
 
-  /** {@inheritDoc} */
-  @SuppressWarnings("unchecked") // Explicit check for value class agreement
-  public V createValue() {
-    if (null == valueclass) {
-      final Class<?> cls = kids[0].createValue().getClass();
-      for (RecordReader<K,? extends V> rr : kids) {
-        if (!cls.equals(rr.createValue().getClass())) {
-          throw new ClassCastException("Child value classes fail to agree");
-        }
-      }
-      valueclass = cls.asSubclass(Writable.class);
-      ivalue = createInternalValue();
-    }
-    return (V) ReflectionUtils.newInstance(valueclass, null);
+  @SuppressWarnings("unchecked")
+  public void initialize(InputSplit split, TaskAttemptContext context) 
+      throws IOException, InterruptedException {
+    super.initialize(split, context);
   }
 
   /**
@@ -124,13 +122,13 @@
     public boolean next(V val) throws IOException {
       boolean ret;
       if (ret = jc.flush(ivalue)) {
-        WritableUtils.cloneInto(val, emit(ivalue));
+        ReflectionUtils.copy(getConf(), emit(ivalue), val);
       }
       return ret;
     }
 
     public boolean replay(V val) throws IOException {
-      WritableUtils.cloneInto(val, emit(ivalue));
+      ReflectionUtils.copy(getConf(), emit(ivalue), val);
       return true;
     }
 

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,21 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Full outer join.
  */
-public class OuterJoinRecordReader<K extends WritableComparable>
+public class OuterJoinRecordReader<K extends WritableComparable<?>>
     extends JoinRecordReader<K> {
 
-  OuterJoinRecordReader(int id, JobConf conf, int capacity,
+  OuterJoinRecordReader(int id, Configuration conf, int capacity,
       Class<? extends WritableComparator> cmpcl) throws IOException {
     super(id, conf, capacity, cmpcl);
   }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,16 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.PriorityQueue;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Prefer the &quot;rightmost&quot; data source for this key.
@@ -33,14 +35,15 @@
  * from S3 over S2, and values from S2 over S1 for all keys
  * emitted from all sources.
  */
-public class OverrideRecordReader<K extends WritableComparable,
+public class OverrideRecordReader<K extends WritableComparable<?>,
                                   V extends Writable>
     extends MultiFilterRecordReader<K,V> {
 
-  OverrideRecordReader(int id, JobConf conf, int capacity,
+  OverrideRecordReader(int id, Configuration conf, int capacity,
       Class<? extends WritableComparator> cmpcl) throws IOException {
     super(id, conf, capacity, cmpcl);
   }
+  private Class<? extends Writable> valueclass = null;
 
   /**
    * Emit the value with the highest position in the tuple.
@@ -50,6 +53,21 @@
     return (V) dst.iterator().next();
   }
 
+  @SuppressWarnings("unchecked") // Explicit check for value class agreement
+  public V createValue() {
+    if (null == valueclass) {
+      Class<?> cls = kids[kids.length -1].createValue().getClass();
+      for (int i = kids.length -1; cls.equals(NullWritable.class); i--) {
+        cls = kids[i].createValue().getClass();
+      }
+      valueclass = cls.asSubclass(Writable.class);
+    }
+    if (valueclass.equals(NullWritable.class)) {
+      return (V) NullWritable.get();
+    }
+    return (V) ReflectionUtils.newInstance(valueclass, null);
+  }
+
   /**
    * Instead of filling the JoinCollector with iterators from all
    * data sources, fill only the rightmost for this key.
@@ -59,9 +77,11 @@
    * n is the cardinality of the cross product of the discarded
    * streams for the given key.
    */
-  protected void fillJoinCollector(K iterkey) throws IOException {
-    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
-    if (!q.isEmpty()) {
+  protected void fillJoinCollector(K iterkey) 
+      throws IOException, InterruptedException {
+    final PriorityQueue<ComposableRecordReader<K,?>> q = 
+      getRecordReaderQueue();
+    if (q != null && !q.isEmpty()) {
       int highpos = -1;
       ArrayList<ComposableRecordReader<K,?>> list =
         new ArrayList<ComposableRecordReader<K,?>>(kids.length);