You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/12/07 23:01:35 UTC

svn commit: r602240 [1/2] - in /lucene/hadoop/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/join/ src/test/org/apache/hadoop/mapred/join/

Author: omalley
Date: Fri Dec  7 14:01:32 2007
New Revision: 602240

URL: http://svn.apache.org/viewvc?rev=602240&view=rev
Log:
HADOOP-2085.  A library to support map-side joins of consistently 
partitioned and sorted data sets. Contributed by Chris Douglas.

Added:
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=602240&r1=602239&r2=602240&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec  7 14:01:32 2007
@@ -40,7 +40,10 @@
 
     HADOOP-1652.  A utility to balance data among datanodes in a HDFS cluster.
     (Hairong Kuang via dhruba)
-    
+
+    HADOOP-2085.  A library to support map-side joins of consistently 
+    partitioned and sorted data sets. (Chris Douglas via omalley)
+
   IMPROVEMENTS
 
     HADOOP-2045.  Change committer list on website to a table, so that

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=602240&r1=602239&r2=602240&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Fri Dec  7 14:01:32 2007
@@ -48,6 +48,7 @@
       "A map/reduce tile laying program to find solutions to pentomino problems.");
       pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
       pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
+      pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
       pgd.driver(argv);
     }
     catch(Throwable e){

Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.join.*;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This is the trivial map/reduce program that does absolutely nothing
+ * other than use the framework to fragment and sort the input values.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar join
+ *            [-m <i>maps</i>] [-r <i>reduces</i>]
+ *            [-inFormat <i>input format class</i>] 
+ *            [-outFormat <i>output format class</i>] 
+ *            [-outKey <i>output key class</i>] 
+ *            [-outValue <i>output value class</i>] 
+ *            [-joinOp &lt;inner|outer|override&gt;]
+ *            [<i>in-dir</i>]* <i>in-dir</i> <i>out-dir</i> 
+ */
+public class Join extends Configured implements Tool {
+
+  static int printUsage() {
+    System.out.println("join [-m <maps>] [-r <reduces>] " +
+                       "[-inFormat <input format class>] " +
+                       "[-outFormat <output format class>] " + 
+                       "[-outKey <output key class>] " +
+                       "[-outValue <output value class>] " +
+                       "[-joinOp <inner|outer|override>] " +
+                       "[input]* <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  /**
+   * The main driver for sort program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the 
+   *                     job tracker.
+   */
+  public int run(String[] args) throws Exception {
+    JobConf jobConf = new JobConf(getConf(), Sort.class);
+    jobConf.setJobName("join");
+
+    jobConf.setMapperClass(IdentityMapper.class);        
+    jobConf.setReducerClass(IdentityReducer.class);
+
+    JobClient client = new JobClient(jobConf);
+    ClusterStatus cluster = client.getClusterStatus();
+    int num_maps = cluster.getTaskTrackers() * 
+    jobConf.getInt("test.sort.maps_per_host", 10);
+    int num_reduces = cluster.getTaskTrackers() * 
+    jobConf.getInt("test.sort.reduces_per_host", cluster.getMaxTasks());
+    Class<? extends InputFormat> inputFormatClass = 
+      SequenceFileInputFormat.class;
+    Class<? extends OutputFormat> outputFormatClass = 
+      SequenceFileOutputFormat.class;
+    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
+    Class<? extends Writable> outputValueClass = TupleWritable.class;
+    String op = "inner";
+    List<String> otherArgs = new ArrayList<String>();
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-m".equals(args[i])) {
+          num_maps = Integer.parseInt(args[++i]);
+        } else if ("-r".equals(args[i])) {
+          num_reduces = Integer.parseInt(args[++i]);
+        } else if ("-inFormat".equals(args[i])) {
+          inputFormatClass = 
+            Class.forName(args[++i]).asSubclass(InputFormat.class);
+        } else if ("-outFormat".equals(args[i])) {
+          outputFormatClass = 
+            Class.forName(args[++i]).asSubclass(OutputFormat.class);
+        } else if ("-outKey".equals(args[i])) {
+          outputKeyClass = 
+            Class.forName(args[++i]).asSubclass(WritableComparable.class);
+        } else if ("-outValue".equals(args[i])) {
+          outputValueClass = 
+            Class.forName(args[++i]).asSubclass(Writable.class);
+        } else if ("-joinOp".equals(args[i])) {
+          op = args[++i];
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage(); // exits
+      }
+    }
+
+    // Set user-supplied (possibly default) job configs
+    jobConf.setNumMapTasks(num_maps);
+    jobConf.setNumReduceTasks(num_reduces);
+
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+
+    jobConf.setOutputPath(new Path(otherArgs.remove(otherArgs.size() - 1)));
+    List<Path> plist = new ArrayList<Path>(otherArgs.size());
+    for (String s : otherArgs) {
+      plist.add(new Path(s));
+    }
+
+    jobConf.setInputFormat(CompositeInputFormat.class);
+    jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
+          op, inputFormatClass, plist.toArray(new Path[0])));
+    jobConf.setOutputFormat(outputFormatClass);
+
+    jobConf.setOutputKeyClass(outputKeyClass);
+    jobConf.setOutputValueClass(outputValueClass);
+
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    JobClient.runJob(jobConf);
+    Date end_time = new Date();
+    System.out.println("Job ended: " + end_time);
+    System.out.println("The job took " + 
+        (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Join(), args);
+    System.exit(res);
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java?rev=602240&r1=602239&r2=602240&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java Fri Dec  7 14:01:32 2007
@@ -218,7 +218,7 @@
         return new CopyInCopyOutBuffer();
       }
     };
-  
+
   /**
    * Make a copy of a writable object using serialization to a buffer.
    * @param orig The object to copy
@@ -226,19 +226,30 @@
    */
   public static Writable clone(Writable orig, JobConf conf) {
     try {
-      Writable newInst = (Writable)ReflectionUtils.newInstance(orig.getClass(),
-                                                               conf);
-      CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
-      buffer.outBuffer.reset();
-      orig.write(buffer.outBuffer);
-      buffer.moveData();
-      newInst.readFields(buffer.inBuffer);
+      Writable newInst =
+        (Writable)ReflectionUtils.newInstance(orig.getClass(), conf);
+      cloneInto(newInst, orig);
       return newInst;
     } catch (IOException e) {
       throw new RuntimeException("Error writing/reading clone buffer", e);
     }
   }
- 
+
+  /**
+   * Make a copy of the writable object using serialiation to a buffer
+   * @param dst the object to copy from
+   * @param src the object to copy into, which is destroyed
+   * @throws IOException
+   */
+  public static void cloneInto(Writable dst, Writable src) throws IOException {
+    CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
+    buffer.outBuffer.reset();
+    src.write(buffer.outBuffer);
+    buffer.moveData();
+    dst.readFields(buffer.inBuffer);
+    return;
+  }
+
   /**
    * Serializes an integer to a binary stream with zero-compressed encoding.
    * For -120 <= i <= 127, only one byte is used with the actual value.

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class provides an implementation of ResetableIterator. The
+ * implementation uses an {@link java.util.ArrayList} to store elements
+ * added to it, replaying them as requested.
+ * Prefer {@link StreamBackedIterator}.
+ */
+public class ArrayListBackedIterator<X extends Writable>
+    implements ResetableIterator<X> {
+
+  private Iterator<X> iter;
+  private ArrayList<X> data;
+  private X hold = null;
+
+  public ArrayListBackedIterator() {
+    this(new ArrayList<X>());
+  }
+
+  public ArrayListBackedIterator(ArrayList<X> data) {
+    this.data = data;
+    this.iter = this.data.iterator();
+  }
+
+  public boolean hasNext() {
+    return iter.hasNext();
+  }
+
+  @SuppressWarnings("unchecked")
+  public boolean next(X val) throws IOException {
+    if (iter.hasNext()) {
+      WritableUtils.cloneInto(val, iter.next());
+      if (null == hold) {
+        hold = (X) WritableUtils.clone(val, null);
+      } else {
+        WritableUtils.cloneInto(hold, val);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public void replay(X val) throws IOException {
+    WritableUtils.cloneInto(val, hold);
+  }
+
+  public void reset() {
+    iter = data.iterator();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void add(X item) throws IOException {
+    data.add((X) WritableUtils.clone(item, null));
+  }
+
+  public void close() throws IOException {
+    iter = null;
+    data = null;
+  }
+
+  public void clear() {
+    data.clear();
+    reset();
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Refinement of InputFormat requiring implementors to provide
+ * ComposableRecordReader instead of RecordReader.
+ */
+public interface ComposableInputFormat<K extends WritableComparable,
+                                       V extends Writable>
+    extends InputFormat<K,V> {
+
+  ComposableRecordReader<K,V> getRecordReader(InputSplit split,
+      JobConf job, Reporter reporter) throws IOException;
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Additional operations required of a RecordReader to participate in a join.
+ */
+interface ComposableRecordReader<K extends WritableComparable,
+                                 V extends Writable>
+    extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {
+
+  /**
+   * Return the position in the collector this class occupies.
+   */
+  int id();
+
+  /**
+   * Return the key this RecordReader would supply on a call to next(K,V)
+   */
+  K key();
+
+  /**
+   * Clone the key at the head of this RecordReader into the object provided.
+   */
+  void key(K key) throws IOException;
+
+  /**
+   * Returns true if the stream is not empty, but provides no guarantee that
+   * a call to next(K,V) will succeed.
+   */
+  boolean hasNext();
+
+  /**
+   * Skip key-value pairs with keys less than or equal to the key provided.
+   */
+  void skip(K key) throws IOException;
+
+  /**
+   * While key-value pairs from this RecordReader match the given key, register
+   * them with the JoinCollector provided.
+   */
+  void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException;
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An InputFormat capable of performing joins over a set of data sources sorted
+ * and partitioned the same way.
+ * @see #setFormat
+ *
+ * A user may define new join types by setting the property
+ * <tt>mapred.join.define.&lt;ident&gt;</tt> to a classname. In the expression
+ * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
+ * ComposableRecordReader.
+ * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
+ * in the join.
+ * @see JoinRecordReader
+ * @see MultiFilterRecordReader
+ */
+public class CompositeInputFormat<K extends WritableComparable>
+      implements ComposableInputFormat<K,TupleWritable> {
+
+  // expression parse tree to which IF requests are proxied
+  private Parser.Node root;
+
+  public CompositeInputFormat() { }
+
+
+  /**
+   * Interpret a given string as a composite expression.
+   * {@code
+   *   func  ::= <ident>([<func>,]*<func>)
+   *   func  ::= tbl(<class>,"<path>")
+   *   class ::= @see java.lang.Class#forName(java.lang.String)
+   *   path  ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
+   * }
+   * Reads expression from the <tt>mapred.join.expr</tt> property and
+   * user-supplied join types from <tt>mapred.join.define.&lt;ident&gt;</tt>
+   *  types. Paths supplied to <tt>tbl</tt> are given as input paths to the
+   * InputFormat class listed.
+   * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
+   */
+  public void setFormat(JobConf job) throws IOException {
+    addDefaults();
+    addUserIdentifiers(job);
+    Class<? extends WritableComparator> cmpcl =
+      job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
+    root = Parser.parse(job.get("mapred.join.expr", null), cmpcl);
+  }
+
+  /**
+   * Adds the default set of identifiers to the parser.
+   */
+  protected void addDefaults() {
+    try {
+      Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
+      Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
+      Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
+      Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException("FATAL: Failed to init defaults", e);
+    }
+  }
+
+  /**
+   * Inform the parser of user-defined types.
+   */
+  private void addUserIdentifiers(JobConf job) throws IOException {
+    Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$");
+    for (Map.Entry<String,String> kv : job) {
+      Matcher m = x.matcher(kv.getKey());
+      if (m.matches()) {
+        try {
+          Parser.CNode.addIdentifier(m.group(1),
+              job.getClass(m.group(0), null, ComposableRecordReader.class));
+        } catch (NoSuchMethodException e) {
+          throw (IOException)new IOException(
+              "Invalid define for " + m.group(1)).initCause(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Verify that this composite has children and that all its children
+   * can validate their input.
+   */
+  public void validateInput(JobConf job) throws IOException {
+    setFormat(job);
+    root.validateInput(job);
+  }
+
+  /**
+   * Build a CompositeInputSplit from the child InputFormats by assigning the
+   * ith split from each child to the ith composite split.
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    setFormat(job);
+    job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+    return root.getSplits(job, numSplits);
+  }
+
+  /**
+   * Construct a CompositeRecordReader for the children of this InputFormat
+   * as defined in the init expression.
+   * The outermost join need only be composable, not necessarily a composite.
+   * Mandating TupleWritable isn't strictly correct.
+   */
+  @SuppressWarnings("unchecked") // child types unknown
+  public ComposableRecordReader<K,TupleWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    setFormat(job);
+    return root.getRecordReader(split, job, reporter);
+  }
+
+  /**
+   * Convenience method for constructing composite formats.
+   * Given InputFormat class (inf), path (p) return:
+   * {@code tbl(<inf>, <p>) }
+   */
+  public static String compose(Class<? extends InputFormat> inf, String path) {
+    return compose(inf.getName().intern(), path, new StringBuffer()).toString();
+  }
+
+  /**
+   * Convenience method for constructing composite formats.
+   * Given operation (op), Object class (inf), set of paths (p) return:
+   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+   */
+  public static String compose(String op, Class<? extends InputFormat> inf,
+      String... path) {
+    final String infname = inf.getName();
+    StringBuffer ret = new StringBuffer(op + '(');
+    for (String p : path) {
+      compose(infname, p, ret);
+      ret.append(',');
+    }
+    ret.setCharAt(ret.length() - 1, ')');
+    return ret.toString();
+  }
+
+  /**
+   * Convenience method for constructing composite formats.
+   * Given operation (op), Object class (inf), set of paths (p) return:
+   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+   */
+  public static String compose(String op, Class<? extends InputFormat> inf,
+      Path... path) {
+    ArrayList<String> tmp = new ArrayList<String>(path.length);
+    for (Path p : path) {
+      tmp.add(p.toString());
+    }
+    return compose(op, inf, tmp.toArray(new String[0]));
+  }
+
+  private static StringBuffer compose(String inf, String path,
+      StringBuffer sb) {
+    sb.append("tbl(" + inf + ",\"");
+    sb.append(path);
+    sb.append("\")");
+    return sb;
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
+ * into this collection must have a public default constructor.
+ */
+public class CompositeInputSplit implements InputSplit {
+
+  private int fill = 0;
+  private long totsize = 0L;
+  private InputSplit[] splits;
+
+  public CompositeInputSplit() { }
+
+  public CompositeInputSplit(int capacity) {
+    splits = new InputSplit[capacity];
+  }
+
+  /**
+   * Add an InputSplit to this collection.
+   * @throws IOException If capacity was not specified during construction
+   *                     or if capacity has been reached.
+   */
+  public void add(InputSplit s) throws IOException {
+    if (null == splits) {
+      throw new IOException("Uninitialized InputSplit");
+    }
+    if (fill == splits.length) {
+      throw new IOException("Too many splits");
+    }
+    splits[fill++] = s;
+    totsize += s.getLength();
+  }
+
+  /**
+   * Get ith child InputSplit.
+   */
+  public InputSplit get(int i) {
+    return splits[i];
+  }
+
+  /**
+   * Return the aggregate length of all child InputSplits currently added.
+   */
+  public long getLength() throws IOException {
+    return totsize;
+  }
+
+  /**
+   * Get the length of ith child InputSplit.
+   */
+  public long getLength(int i) throws IOException {
+    return splits[i].getLength();
+  }
+
+  /**
+   * Collect a set of hosts from all child InputSplits.
+   */
+  public String[] getLocations() throws IOException {
+    HashSet<String> hosts = new HashSet<String>();
+    for (InputSplit s : splits) {
+      String[] hints = s.getLocations();
+      if (hints != null && hints.length > 0) {
+        for (String host : hints) {
+          hosts.add(host);
+        }
+      }
+    }
+    return hosts.toArray(new String[hosts.size()]);
+  }
+
+  /**
+   * getLocations from ith InputSplit.
+   */
+  public String[] getLocation(int i) throws IOException {
+    return splits[i].getLocations();
+  }
+
+  /**
+   * Write splits in the following format.
+   * {@code
+   * <count><class1><class2>...<classn><split1><split2>...<splitn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, splits.length);
+    for (InputSplit s : splits) {
+      Text.writeString(out, s.getClass().getName());
+    }
+    for (InputSplit s : splits) {
+      s.write(out);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @throws IOException If the child InputSplit cannot be read, typically
+   *                     for faliing access checks.
+   */
+  @SuppressWarnings("unchecked")  // Explicit check for split class agreement
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    if (splits == null || splits.length != card) {
+      splits = new InputSplit[card];
+    }
+    Class<? extends InputSplit>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        cls[i] =
+          Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
+      }
+      for (int i = 0; i < card; ++i) {
+        splits[i] = (InputSplit) ReflectionUtils.newInstance(cls[i], null);
+        splits[i].readFields(in);
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException)new IOException("Failed split init").initCause(e);
+    }
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,472 @@
+/** * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A RecordReader that can effect joins of RecordReaders sharing a common key
+ * type and partitioning.
+ */
+public abstract class CompositeRecordReader<
+    K extends WritableComparable, // key type
+    V extends Writable,           // accepts RecordReader<K,V> as children
+    X extends Writable>           // emits Writables of this type
+    implements Configurable {
+
+
+  private int id;
+  private Configuration conf;
+  private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
+
+  private WritableComparator cmp;
+  private Class<? extends WritableComparable> keyclass;
+  private PriorityQueue<ComposableRecordReader<K,?>> q;
+
+  protected final JoinCollector jc;
+  protected final ComposableRecordReader<K,? extends V>[] kids;
+
+  protected abstract boolean combine(Object[] srcs, TupleWritable value);
+
+  /**
+   * Create a RecordReader with <tt>capacity</tt> children to position
+   * <tt>id</tt> in the parent reader.
+   * The id of a root CompositeRecordReader is -1 by convention, but relying
+   * on this is not recommended.
+   */
+  @SuppressWarnings("unchecked") // Generic array assignment
+  public CompositeRecordReader(int id, int capacity,
+      Class<? extends WritableComparator> cmpcl)
+      throws IOException {
+    assert capacity > 0 : "Invalid capacity";
+    this.id = id;
+    if (null != cmpcl) {
+      cmp = (WritableComparator)ReflectionUtils.newInstance(cmpcl, null);
+      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+          new Comparator<ComposableRecordReader<K,?>>() {
+            public int compare(ComposableRecordReader<K,?> o1,
+                               ComposableRecordReader<K,?> o2) {
+              return cmp.compare(o1.key(), o2.key());
+            }
+          });
+    }
+    jc = new JoinCollector(capacity);
+    kids = new ComposableRecordReader[capacity];
+  }
+
+  /**
+   * Return the position in the collector this class occupies.
+   */
+  public int id() {
+    return id;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Return sorted list of RecordReaders for this composite.
+   */
+  protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
+    return q;
+  }
+
+  /**
+   * Return comparator defining the ordering for RecordReaders in this
+   * composite.
+   */
+  protected WritableComparator getComparator() {
+    return cmp;
+  }
+
+  /**
+   * Add a RecordReader to this collection.
+   * The id() of a RecordReader determines where in the Tuple its
+   * entry will appear. Adding RecordReaders with the same id has
+   * undefined behavior.
+   */
+  public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
+    kids[rr.id()] = rr;
+    if (null == q) {
+      cmp = WritableComparator.get(rr.createKey().getClass());
+      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+          new Comparator<ComposableRecordReader<K,?>>() {
+            public int compare(ComposableRecordReader<K,?> o1,
+                               ComposableRecordReader<K,?> o2) {
+              return cmp.compare(o1.key(), o2.key());
+            }
+          });
+    }
+    q.add(rr);
+  }
+
+  /**
+   * Collector for join values.
+   * This accumulates values for a given key from the child RecordReaders. If
+   * one or more child RR contain duplicate keys, this will emit the cross
+   * product of the associated values until exhausted.
+   */
+  class JoinCollector {
+    private K key;
+    private ResetableIterator<X>[] iters;
+    private long partial = 0L;
+    private long replaymask = 0L;
+    private int start = 0;
+    private int pos = -1;
+    private int iterpos = -1;
+    private boolean first = true;
+
+    /**
+     * Construct a collector capable of handling the specified number of
+     * children.
+     */
+    @SuppressWarnings("unchecked") // Generic array assignment
+    public JoinCollector(int card) {
+      iters = new ResetableIterator[card];
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i] = EMPTY;
+      }
+    }
+
+    /**
+     * Register a given iterator at position id.
+     */
+    public void add(int id, ResetableIterator<X> i)
+        throws IOException {
+      iters[id] = i;
+    }
+
+    /**
+     * Return the key associated with this collection.
+     */
+    public K key() {
+      return key;
+    }
+
+    /**
+     * Codify the contents of the collector to be iterated over.
+     * When this is called, all RecordReaders registered for this
+     * key should have added ResetableIterators.
+     */
+    public void reset(K key) {
+      this.key = key;
+      start = 0;
+      pos = 0;
+      first = true;
+      partial = 0L;
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].reset();
+      }
+    }
+
+    /**
+     * Clear all state information.
+     */
+    public void clear() {
+      key = null;
+      pos = -1;
+      first = true;
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].clear();
+        iters[i] = EMPTY;
+      }
+      partial = 0L;
+    }
+
+    /**
+     * Returns false if exhausted or if reset(K) has not been called.
+     */
+    protected boolean hasNext() {
+      return !(pos < 0);
+    }
+
+    /**
+     * Populate Tuple from iterators.
+     * It should be the case that, given iterators i_1...i_n over values from
+     * sources s_1...s_n sharing key k, repeated calls to next should yield
+     * I x I.
+     */
+    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+    protected boolean next(TupleWritable val) throws IOException {
+      if (pos < 0) {
+        clear();
+        return false;
+      }
+      int i = start;
+      if (first) { // Find first iterator with elements
+        for (; i < iters.length && !iters[i].hasNext(); ++i);
+        if (iters.length <= i) { // no children had key
+          clear();
+          return false;
+        }
+        start = i;
+        for (int j = i; j < iters.length; ++j) {
+          if (iters[j].hasNext()) {
+            partial |= 1 << j;
+          }
+        }
+        iterpos = pos = iters.length - 1;
+        first = false;
+      } else { // Copy all elements in partial into tuple
+        for (; i < iterpos; ++i) {
+          if ((partial & (1 << i)) != 0) {
+            iters[i].replay((X)val.get(i));
+            val.setWritten(i);
+          }
+        }
+      }
+      long partialwritten = val.mask();
+      if (iters[i].next((X)val.get(i))) {
+        val.setWritten(i);
+      }
+      for (++i; i < iters.length; ++i) {
+        iters[i].reset();
+        if (iters[i].hasNext() && iters[i].next((X)val.get(i))) {
+          val.setWritten(i);
+        }
+      }
+      iterpos = iters.length - 1;
+      for (; iterpos > pos && !iters[iterpos].hasNext(); --iterpos);
+      if (!iters[iterpos].hasNext()) {
+        for (; !(pos < 0 || iters[pos].hasNext()); --pos);
+        iterpos = pos;
+      }
+      replaymask = val.mask();
+      if ((replaymask ^ partialwritten) == 0L) {
+        return next(val);
+      }
+      return true;
+    }
+
+    /**
+     * Replay the last Tuple emitted.
+     */
+    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+    public void replay(TupleWritable val) throws IOException {
+      // The last emitted tuple might have drawn on an empty source;
+      // it can't be cleared prematurely, b/c there may be more duplicate
+      // keys in iterator positions < pos
+      if (first) {
+        throw new IllegalStateException();
+      }
+      for (int i = 0; i < iters.length; ++i) {
+        if ((replaymask & (1 << i)) != 0) {
+          iters[i].replay((X)val.get(i));
+        }
+      }
+    }
+
+    /**
+     * Close all child iterators.
+     */
+    public void close() throws IOException {
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].close();
+      }
+    }
+
+    /**
+     * Write the next value into key, value as accepted by the operation
+     * associated with this set of RecordReaders.
+     */
+    public boolean flush(TupleWritable value) throws IOException {
+      while (hasNext()) {
+        value.clearWritten();
+        if (next(value) && combine(kids, value)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Return the key for the current join or the value at the top of the
+   * RecordReader heap.
+   */
+  public K key() {
+    if (jc.hasNext()) {
+      return jc.key();
+    }
+    if (!q.isEmpty()) {
+      return q.peek().key();
+    }
+    return null;
+  }
+
+  /**
+   * Clone the key at the top of this RR into the given object.
+   */
+  public void key(K key) throws IOException {
+    WritableUtils.cloneInto(key, key());
+  }
+
+  /**
+   * Return true if it is possible that this could emit more values.
+   */
+  public boolean hasNext() {
+    return jc.hasNext() || !q.isEmpty();
+  }
+
+  /**
+   * Pass skip key to child RRs.
+   */
+  public void skip(K key) throws IOException {
+    ArrayList<ComposableRecordReader<K,?>> tmp =
+      new ArrayList<ComposableRecordReader<K,?>>();
+    while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
+      tmp.add(q.poll());
+    }
+    for (ComposableRecordReader<K,?> rr : tmp) {
+      rr.skip(key);
+      q.add(rr);
+    }
+  }
+
+  /**
+   * Obtain an iterator over the child RRs apropos of the value type
+   * ultimately emitted from this join.
+   */
+  protected abstract ResetableIterator<X> getDelegate();
+
+  /**
+   * If key provided matches that of this Composite, give JoinCollector
+   * iterator over values it may emit.
+   */
+  @SuppressWarnings("unchecked") // No values from static EMPTY class
+  public void accept(CompositeRecordReader.JoinCollector jc, K key)
+      throws IOException {
+    if (hasNext() && 0 == cmp.compare(key, key())) {
+      fillJoinCollector(createKey());
+      jc.add(id, getDelegate());
+      return;
+    }
+    jc.add(id, EMPTY);
+  }
+
+  /**
+   * For all child RRs offering the key provided, obtain an iterator
+   * at that position in the JoinCollector.
+   */
+  protected void fillJoinCollector(K iterkey) throws IOException {
+    if (!q.isEmpty()) {
+      q.peek().key(iterkey);
+      while (0 == cmp.compare(q.peek().key(), iterkey)) {
+        ComposableRecordReader<K,?> t = q.poll();
+        t.accept(jc, iterkey);
+        if (t.hasNext()) {
+          q.add(t);
+        } else if (q.isEmpty()) {
+          return;
+        }
+      }
+    }
+  }
+
+  /**
+   * Implement Comparable contract (compare key of join or head of heap
+   * with that of another).
+   */
+  public int compareTo(ComposableRecordReader<K,?> other) {
+    return cmp.compare(key(), other.key());
+  }
+
+  /**
+   * Create a new key value common to all child RRs.
+   * @throws ClassCastException if key classes differ.
+   */
+  @SuppressWarnings("unchecked") // Explicit check for key class agreement
+  public K createKey() {
+    if (null == keyclass) {
+      final Class<?> cls = kids[0].createKey().getClass();
+      for (RecordReader<K,? extends Writable> rr : kids) {
+        if (!cls.equals(rr.createKey().getClass())) {
+          throw new ClassCastException("Child key classes fail to agree");
+        }
+      }
+      keyclass = cls.asSubclass(WritableComparable.class);
+    }
+    return (K) ReflectionUtils.newInstance(keyclass, getConf());
+  }
+
+  /**
+   * Create a value to be used internally for joins.
+   */
+  protected TupleWritable createInternalValue() {
+    Writable[] vals = new Writable[kids.length];
+    for (int i = 0; i < vals.length; ++i) {
+      vals[i] = kids[i].createValue();
+    }
+    return new TupleWritable(vals);
+  }
+
+  /**
+   * Unsupported (returns zero in all cases).
+   */
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  /**
+   * Close all child RRs.
+   */
+  public void close() throws IOException {
+    if (kids != null) {
+      for (RecordReader<K,? extends Writable> rr : kids) {
+        rr.close();
+      }
+    }
+    if (jc != null) {
+      jc.close();
+    }
+  }
+
+  /**
+   * Report progress as the minimum of all child RR progress.
+   */
+  public float getProgress() throws IOException {
+    float ret = 1.0f;
+    for (RecordReader<K,? extends Writable> rr : kids) {
+      ret = Math.min(ret, rr.getProgress());
+    }
+    return ret;
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Full inner join.
+ */
+public class InnerJoinRecordReader<K extends WritableComparable>
+    extends JoinRecordReader<K> {
+
+  InnerJoinRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, conf, capacity, cmpcl);
+  }
+
+  /**
+   * Return true iff the tuple is full (all data sources contain this key).
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    assert srcs.length == dst.size();
+    for (int i = 0; i < srcs.length; ++i) {
+      if (!dst.has(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Base class for Composite joins returning Tuples of arbitrary Writables.
+ */
+public abstract class JoinRecordReader<K extends WritableComparable>
+    extends CompositeRecordReader<K,Writable,TupleWritable>
+    implements ComposableRecordReader<K,TupleWritable> {
+
+  public JoinRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, capacity, cmpcl);
+    setConf(conf);
+  }
+
+  /**
+   * Emit the next set of key, value pairs as defined by the child
+   * RecordReaders and operation associated with this composite RR.
+   */
+  public boolean next(K key, TupleWritable value) throws IOException {
+    if (jc.flush(value)) {
+      WritableUtils.cloneInto(key, jc.key());
+      return true;
+    }
+    jc.clear();
+    K iterkey = createKey();
+    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+    while (!q.isEmpty()) {
+      fillJoinCollector(iterkey);
+      jc.reset(iterkey);
+      if (jc.flush(value)) {
+        WritableUtils.cloneInto(key, jc.key());
+        return true;
+      }
+      jc.clear();
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public TupleWritable createValue() {
+    return createInternalValue();
+  }
+
+  /**
+   * Return an iterator wrapping the JoinCollector.
+   */
+  protected ResetableIterator<TupleWritable> getDelegate() {
+    return new JoinDelegationIterator();
+  }
+
+  /**
+   * Since the JoinCollector is effecting our operation, we need only
+   * provide an iterator proxy wrapping its operation.
+   */
+  protected class JoinDelegationIterator
+      implements ResetableIterator<TupleWritable> {
+
+    public boolean hasNext() {
+      return jc.hasNext();
+    }
+
+    public boolean next(TupleWritable val) throws IOException {
+      return jc.flush(val);
+    }
+
+    public void replay(TupleWritable val) throws IOException {
+      jc.replay(val);
+    }
+
+    public void reset() {
+      jc.reset(jc.key());
+    }
+
+    public void add(TupleWritable item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {
+      jc.close();
+    }
+
+    public void clear() {
+      jc.clear();
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Base class for Composite join returning values derived from multiple
+ * sources, but generally not tuples.
+ */
+public abstract class MultiFilterRecordReader<K extends WritableComparable,
+                                              V extends Writable>
+    extends CompositeRecordReader<K,V,V>
+    implements ComposableRecordReader<K,V> {
+
+  private Class<? extends Writable> valueclass;
+  private TupleWritable ivalue;
+
+  public MultiFilterRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, capacity, cmpcl);
+    setConf(conf);
+  }
+
+  /**
+   * For each tuple emitted, return a value (typically one of the values
+   * in the tuple).
+   * Modifying the Writables in the tuple is permitted and unlikely to affect
+   * join behavior in most cases, but it is not recommended. It's safer to
+   * clone first.
+   */
+  protected abstract V emit(TupleWritable dst) throws IOException;
+
+  /**
+   * Default implementation offers {@link #emit} every Tuple from the
+   * collector (the outer join of child RRs).
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  public boolean next(K key, V value) throws IOException {
+    if (jc.flush(ivalue)) {
+      WritableUtils.cloneInto(key, jc.key());
+      WritableUtils.cloneInto(value, emit(ivalue));
+      return true;
+    }
+    jc.clear();
+    K iterkey = createKey();
+    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+    while (!q.isEmpty()) {
+      fillJoinCollector(iterkey);
+      jc.reset(iterkey);
+      if (jc.flush(ivalue)) {
+        WritableUtils.cloneInto(key, jc.key());
+        WritableUtils.cloneInto(value, emit(ivalue));
+        return true;
+      }
+      jc.clear();
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked") // Explicit check for value class agreement
+  public V createValue() {
+    if (null == valueclass) {
+      final Class<?> cls = kids[0].createValue().getClass();
+      for (RecordReader<K,? extends V> rr : kids) {
+        if (!cls.equals(rr.createValue().getClass())) {
+          throw new ClassCastException("Child value classes fail to agree");
+        }
+      }
+      valueclass = cls.asSubclass(Writable.class);
+      ivalue = createInternalValue();
+    }
+    return (V) ReflectionUtils.newInstance(valueclass, null);
+  }
+
+  /**
+   * Return an iterator returning a single value from the tuple.
+   * @see MultiFilterDelegationIterator
+   */
+  protected ResetableIterator<V> getDelegate() {
+    return new MultiFilterDelegationIterator();
+  }
+
+  /**
+   * Proxy the JoinCollector, but include callback to emit.
+   */
+  protected class MultiFilterDelegationIterator
+      implements ResetableIterator<V> {
+
+    public boolean hasNext() {
+      return jc.hasNext();
+    }
+
+    public boolean next(V val) throws IOException {
+      boolean ret;
+      if (ret = jc.flush(ivalue)) {
+        WritableUtils.cloneInto(val, emit(ivalue));
+      }
+      return ret;
+    }
+
+    public void replay(V val) throws IOException {
+      WritableUtils.cloneInto(val, emit(ivalue));
+    }
+
+    public void reset() {
+      jc.reset(jc.key());
+    }
+
+    public void add(V item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {
+      jc.close();
+    }
+
+    public void clear() {
+      jc.clear();
+    }
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Full outer join.
+ */
+public class OuterJoinRecordReader<K extends WritableComparable>
+    extends JoinRecordReader<K> {
+
+  OuterJoinRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, conf, capacity, cmpcl);
+  }
+
+  /**
+   * Emit everything from the collector.
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    assert srcs.length == dst.size();
+    return true;
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Prefer the &quot;rightmost&quot; data source for this key.
+ * For example, <tt>override(S1,S2,S3)</tt> will prefer values
+ * from S3 over S2, and values from S2 over S1 for all keys
+ * emitted from all sources.
+ */
+public class OverrideRecordReader<K extends WritableComparable,
+                                  V extends Writable>
+    extends MultiFilterRecordReader<K,V> {
+
+  OverrideRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, conf, capacity, cmpcl);
+  }
+
+  /**
+   * Emit the value with the highest position in the tuple.
+   */
+  @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+  protected V emit(TupleWritable dst) {
+    return (V) dst.iterator().next();
+  }
+
+  /**
+   * Instead of filling the JoinCollector with iterators from all
+   * data sources, fill only the rightmost for this key.
+   * This not only saves space by discarding the other sources, but
+   * it also emits the number of key-value pairs in the preferred
+   * RecordReader instead of repeating that stream n times, where
+   * n is the cardinality of the cross product of the discarded
+   * streams for the given key.
+   */
+  protected void fillJoinCollector(K iterkey) throws IOException {
+    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+    if (!q.isEmpty()) {
+      int highpos = -1;
+      ArrayList<ComposableRecordReader<K,?>> list =
+        new ArrayList<ComposableRecordReader<K,?>>(kids.length);
+      q.peek().key(iterkey);
+      final WritableComparator cmp = getComparator();
+      while (0 == cmp.compare(q.peek().key(), iterkey)) {
+        ComposableRecordReader<K,?> t = q.poll();
+        if (-1 == highpos || list.get(highpos).id() < t.id()) {
+          highpos = list.size();
+        }
+        list.add(t);
+        if (q.isEmpty())
+          break;
+      }
+      ComposableRecordReader<K,?> t = list.remove(highpos);
+      t.accept(jc, iterkey);
+      for (ComposableRecordReader<K,?> rr : list) {
+        rr.skip(iterkey);
+      }
+      list.add(t);
+      for (ComposableRecordReader<K,?> rr : list) {
+        if (rr.hasNext()) {
+          q.add(rr);
+        }
+      }
+    }
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.CharArrayReader;
+import java.io.IOException;
+import java.io.StreamTokenizer;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Very simple shift-reduce parser for join expressions.
+ *
+ * This should be sufficient for the user extension permitted now, but ought to
+ * be replaced with a parser generator if more complex grammars are supported.
+ * In particular, this &quot;shift-reduce&quot; parser has no states. Each set
+ * of formals requires a different internal node type, which is responsible for
+ * interpreting the list of tokens it receives. This is sufficient for the
+ * current grammar, but it has several annoying properties that might inhibit
+ * extension. In particular, parenthesis are always function calls; an
+ * algebraic or filter grammar would not only require a node type, but must
+ * also work around the internals of this parser.
+ *
+ * For most other cases, adding classes to the hierarchy- particularly by
+ * extending JoinRecordReader and MultiFilterRecordReader- is fairly
+ * straightforward. One need only override the relevant method(s) (usually only
+ * {@link CompositeRecordReader#combine}) and include a property to map its
+ * value to an identifier in the parser.
+ */
+public class Parser {
+  public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
+
+  /**
+   * Tagged-union type for tokens from the join expression.
+   * @see Parser.TType
+   */
+  public static class Token {
+
+    private TType type;
+
+    Token(TType type) {
+      this.type = type;
+    }
+
+    public TType getType() { return type; }
+    public Node getNode() throws IOException {
+      throw new IOException("Expected nodetype");
+    }
+    public double getNum() throws IOException {
+      throw new IOException("Expected numtype");
+    }
+    public String getStr() throws IOException {
+      throw new IOException("Expected strtype");
+    }
+  }
+
+  public static class NumToken extends Token {
+    private double num;
+    public NumToken(double num) {
+      super(TType.NUM);
+      this.num = num;
+    }
+    public double getNum() { return num; }
+  }
+
+  public static class NodeToken extends Token {
+    private Node node;
+    NodeToken(Node node) {
+      super(TType.CIF);
+      this.node = node;
+    }
+    public Node getNode() {
+      return node;
+    }
+  }
+
+  public static class StrToken extends Token {
+    private String str;
+    public StrToken(TType type, String str) {
+      super(type);
+      this.str = str;
+    }
+    public String getStr() {
+      return str;
+    }
+  }
+
+  /**
+   * Simple lexer wrapping a StreamTokenizer.
+   * This encapsulates the creation of tagged-union Tokens and initializes the
+   * SteamTokenizer.
+   */
+  private static class Lexer {
+
+    private StreamTokenizer tok;
+
+    Lexer(String s) {
+      tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
+      tok.quoteChar('"');
+      tok.parseNumbers();
+      tok.ordinaryChar(',');
+      tok.ordinaryChar('(');
+      tok.ordinaryChar(')');
+    }
+
+    Token next() throws IOException {
+      int type = tok.nextToken();
+      switch (type) {
+        case StreamTokenizer.TT_EOF:
+        case StreamTokenizer.TT_EOL:
+          return null;
+        case StreamTokenizer.TT_NUMBER:
+          return new NumToken(tok.nval);
+        case StreamTokenizer.TT_WORD:
+          return new StrToken(TType.IDENT, tok.sval);
+        case '"':
+          return new StrToken(TType.QUOT, tok.sval);
+        default:
+          switch (type) {
+            case ',':
+              return new Token(TType.COMMA);
+            case '(':
+              return new Token(TType.LPAREN);
+            case ')':
+              return new Token(TType.RPAREN);
+            default:
+              throw new IOException("Unexpected: " + type);
+          }
+      }
+    }
+  }
+
+  public abstract static class Node implements ComposableInputFormat {
+    /**
+     * Return the node type registered for the particular identifier.
+     * By default, this is a CNode for any composite node and a WNode
+     * for &quot;wrapped&quot; nodes. User nodes will likely be composite
+     * nodes.
+     * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
+     * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
+     */
+    static Node forIdent(String ident) throws IOException {
+      try {
+        if (!nodeCstrMap.containsKey(ident)) {
+          throw new IOException("No nodetype for " + ident);
+        }
+        return nodeCstrMap.get(ident).newInstance(ident);
+      } catch (IllegalAccessException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException)new IOException().initCause(e);
+      }
+    }
+
+    private static final Class<?>[] ncstrSig = { String.class };
+    private static final
+        Map<String,Constructor<? extends Node>> nodeCstrMap =
+        new HashMap<String,Constructor<? extends Node>>();
+    protected static final
+        Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
+        new HashMap<String,Constructor<? extends ComposableRecordReader>>();
+
+    /**
+     * For a given identifier, add a mapping to the nodetype for the parse
+     * tree and to the ComposableRecordReader to be created, including the
+     * formals required to invoke the constructor.
+     * The nodetype and constructor signature should be filled in from the
+     * child node.
+     */
+    protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
+                              Class<? extends Node> nodetype,
+                              Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Constructor<? extends Node> ncstr =
+        nodetype.getDeclaredConstructor(ncstrSig);
+      ncstr.setAccessible(true);
+      nodeCstrMap.put(ident, ncstr);
+      Constructor<? extends ComposableRecordReader> mcstr =
+        cl.getDeclaredConstructor(mcstrSig);
+      mcstr.setAccessible(true);
+      rrCstrMap.put(ident, mcstr);
+    }
+
+    // inst
+    protected int id = -1;
+    protected String ident;
+    protected Class<? extends WritableComparator> cmpcl;
+
+    protected Node(String ident) {
+      this.ident = ident;
+    }
+
+    protected void setID(int id) {
+      this.id = id;
+    }
+
+    protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+      this.cmpcl = cmpcl;
+    }
+    abstract void parse(List<Token> args) throws IOException;
+  }
+
+  /**
+   * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
+   */
+  static class WNode extends Node {
+    private static final Class<?>[] cstrSig =
+      { Integer.TYPE, RecordReader.class, Class.class };
+
+    static void addIdentifier(String ident,
+                              Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Node.addIdentifier(ident, cstrSig, WNode.class, cl);
+    }
+
+    private String indir;
+    private InputFormat inf;
+
+    public WNode(String ident) {
+      super(ident);
+    }
+
+    /**
+     * Let the first actual define the InputFormat and the second define
+     * the <tt>mapred.input.dir</tt> property.
+     */
+    public void parse(List<Token> ll) throws IOException {
+      StringBuilder sb = new StringBuilder();
+      Iterator<Token> i = ll.iterator();
+      while (i.hasNext()) {
+        Token t = i.next();
+        if (TType.COMMA.equals(t.getType())) {
+          try {
+            inf = (InputFormat)ReflectionUtils.newInstance(
+                Class.forName(sb.toString()).asSubclass(InputFormat.class),
+                null);
+          } catch (ClassNotFoundException e) {
+            throw (IOException)new IOException().initCause(e);
+          } catch (IllegalArgumentException e) {
+            throw (IOException)new IOException().initCause(e);
+          }
+          break;
+        }
+        sb.append(t.getStr());
+      }
+      if (!i.hasNext()) {
+        throw new IOException("Parse error");
+      }
+      Token t = i.next();
+      if (!TType.QUOT.equals(t.getType())) {
+        throw new IOException("Expected quoted string");
+      }
+      indir = t.getStr();
+      // no check for ll.isEmpty() to permit extension
+    }
+
+    private JobConf getConf(JobConf job) {
+      JobConf conf = new JobConf(job);
+      conf.setInputPath(new Path(indir));
+      return conf;
+    }
+
+    public void validateInput(JobConf job) throws IOException {
+      inf.validateInput(getConf(job));
+    }
+
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      return inf.getSplits(getConf(job), numSplits);
+    }
+
+    public ComposableRecordReader getRecordReader(
+        InputSplit split, JobConf job, Reporter reporter) throws IOException {
+      try {
+        if (!rrCstrMap.containsKey(ident)) {
+          throw new IOException("No RecordReader for " + ident);
+        }
+        return rrCstrMap.get(ident).newInstance(id,
+            inf.getRecordReader(split, getConf(job), reporter), cmpcl);
+      } catch (IllegalAccessException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException)new IOException().initCause(e);
+      }
+    }
+
+    public String toString() {
+      return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
+    }
+  }
+
+  /**
+   * Internal nodetype for &quot;composite&quot; InputFormats.
+   */
+  static class CNode extends Node {
+
+    private static final Class<?>[] cstrSig =
+      { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
+
+    static void addIdentifier(String ident,
+                              Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Node.addIdentifier(ident, cstrSig, CNode.class, cl);
+    }
+
+    // inst
+    private ArrayList<Node> kids = new ArrayList<Node>();
+
+    public CNode(String ident) {
+      super(ident);
+    }
+
+    public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+      super.setKeyComparator(cmpcl);
+      for (Node n : kids) {
+        n.setKeyComparator(cmpcl);
+      }
+    }
+
+    public void validateInput(JobConf job) throws IOException {
+      if (0 == kids.size()) {
+        throw new IOException("Childless composite");
+      }
+      for (Node n : kids) {
+        n.validateInput(job);
+      }
+    }
+
+    /**
+     * Combine InputSplits from child InputFormats into a
+     * {@link CompositeInputSplit}.
+     */
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      InputSplit[][] splits = new InputSplit[kids.size()][];
+      for (int i = 0; i < kids.size(); ++i) {
+        final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
+        if (null == tmp) {
+          throw new IOException("Error gathering splits from child RReader");
+        }
+        if (i > 0 && splits[i-1].length != tmp.length) {
+          throw new IOException("Inconsistent split cardinality from child");
+        }
+        splits[i] = tmp;
+      }
+      final int size = splits[0].length;
+      CompositeInputSplit[] ret = new CompositeInputSplit[size];
+      for (int i = 0; i < size; ++i) {
+        ret[i] = new CompositeInputSplit(splits.length);
+        for (int j = 0; j < splits.length; ++j) {
+          ret[i].add(splits[j][i]);
+        }
+      }
+      return ret;
+    }
+
+    @SuppressWarnings("unchecked") // child types unknowable
+    public ComposableRecordReader getRecordReader(
+        InputSplit split, JobConf job, Reporter reporter) throws IOException {
+      if (!(split instanceof CompositeInputSplit)) {
+        throw new IOException("Invalid split type:" +
+                              split.getClass().getName());
+      }
+      final CompositeInputSplit spl = (CompositeInputSplit)split;
+      final int capacity = kids.size();
+      CompositeRecordReader ret = null;
+      try {
+        if (!rrCstrMap.containsKey(ident)) {
+          throw new IOException("No RecordReader for " + ident);
+        }
+        ret = (CompositeRecordReader)
+          rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
+      } catch (IllegalAccessException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException)new IOException().initCause(e);
+      }
+      for (int i = 0; i < capacity; ++i) {
+        ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
+      }
+      return (ComposableRecordReader)ret;
+    }
+
+    /**
+     * Parse a list of comma-separated nodes.
+     */
+    public void parse(List<Token> args) throws IOException {
+      ListIterator<Token> i = args.listIterator();
+      while (i.hasNext()) {
+        Token t = i.next();
+        t.getNode().setID(i.previousIndex() >> 1);
+        kids.add(t.getNode());
+        if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
+          throw new IOException("Expected ','");
+        }
+      }
+    }
+
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(ident + "(");
+      for (Node n : kids) {
+        sb.append(n.toString() + ",");
+      }
+      sb.setCharAt(sb.length() - 1, ')');
+      return sb.toString();
+    }
+  }
+
+  private static Token reduce(Stack<Token> st) throws IOException {
+    LinkedList<Token> args = new LinkedList<Token>();
+    while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
+      args.addFirst(st.pop());
+    }
+    if (st.isEmpty()) {
+      throw new IOException("Unmatched ')'");
+    }
+    st.pop();
+    if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
+      throw new IOException("Identifier expected");
+    }
+    Node n = Node.forIdent(st.pop().getStr());
+    n.parse(args);
+    return new NodeToken(n);
+  }
+
+  /**
+   * Given an expression and an optional comparator, build a tree of
+   * InputFormats using the comparator to sort keys.
+   */
+  static Node parse(String expr, Class<? extends WritableComparator> cmpcl)
+      throws IOException {
+    if (null == expr) {
+      throw new IOException("Expression is null");
+    }
+    Lexer lex = new Lexer(expr);
+    Stack<Token> st = new Stack<Token>();
+    Token tok;
+    while ((tok = lex.next()) != null) {
+      if (TType.RPAREN.equals(tok.getType())) {
+        st.push(reduce(st));
+      } else {
+        st.push(tok);
+      }
+    }
+    if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
+      Node ret = st.pop().getNode();
+      if (cmpcl != null) {
+        ret.setKeyComparator(cmpcl);
+      }
+      return ret;
+    }
+    throw new IOException("Missing ')'");
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This defines an interface to a stateful Iterator that can replay elements
+ * added to it directly.
+ * Note that this does not extend {@link java.util.Iterator}.
+ */
+public interface ResetableIterator<T extends Writable> {
+
+  public static class EMPTY<U extends Writable>
+    implements ResetableIterator<U> {
+    public boolean hasNext() { return false; }
+    public void reset() { }
+    public void close() throws IOException { }
+    public void clear() { }
+    public boolean next(U val) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    public void replay(U val) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    public void add(U item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * True iff a call to next will succeed.
+   */
+  public boolean hasNext();
+
+  /**
+   * Assign next value to actual.
+   * It is required that elements added to a ResetableIterator be returned in
+   * the same order after a call to {@link #reset} (FIFO).
+   *
+   * Note that a call to this may fail for nested joins (i.e. more elements
+   * available, but none satisfying the constraints of the join)
+   */
+  public boolean next(T val) throws IOException;
+
+  /**
+   * Assign last value returned to actual.
+   */
+  public void replay(T val) throws IOException;
+
+  /**
+   * Set iterator to return to the start of its range. Must be called after
+   * calling {@link #add} to avoid a ConcurrentModificationException.
+   */
+  public void reset();
+
+  /**
+   * Add an element to the collection of elements to iterate over.
+   */
+  public void add(T item) throws IOException;
+
+  /**
+   * Close datasources and release resources. Calling methods on the iterator
+   * after calling close has undefined behavior.
+   */
+  // XXX is this necessary?
+  public void close() throws IOException;
+
+  /**
+   * Close datasources, but do not release internal resources. Calling this
+   * method should permit the object to be reused with a different datasource.
+   */
+  public void clear();
+
+}