You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/12/07 23:01:35 UTC
svn commit: r602240 [1/2] - in /lucene/hadoop/trunk: ./
src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/mapred/join/
src/test/org/apache/hadoop/mapred/join/
Author: omalley
Date: Fri Dec 7 14:01:32 2007
New Revision: 602240
URL: http://svn.apache.org/viewvc?rev=602240&view=rev
Log:
HADOOP-2085. A library to support map-side joins of consistently
partitioned and sorted data sets. Contributed by Chris Douglas.
Added:
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=602240&r1=602239&r2=602240&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 7 14:01:32 2007
@@ -40,7 +40,10 @@
HADOOP-1652. A utility to balance data among datanodes in a HDFS cluster.
(Hairong Kuang via dhruba)
-
+
+ HADOOP-2085. A library to support map-side joins of consistently
+ partitioned and sorted data sets. (Chris Douglas via omalley)
+
IMPROVEMENTS
HADOOP-2045. Change committer list on website to a table, so that
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=602240&r1=602239&r2=602240&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Fri Dec 7 14:01:32 2007
@@ -48,6 +48,7 @@
"A map/reduce tile laying program to find solutions to pentomino problems.");
pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
+ pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
pgd.driver(argv);
}
catch(Throwable e){
Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Join.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.join.*;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This is the trivial map/reduce program that does absolutely nothing
+ * other than use the framework to fragment and sort the input values.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar join
+ * [-m <i>maps</i>] [-r <i>reduces</i>]
+ * [-inFormat <i>input format class</i>]
+ * [-outFormat <i>output format class</i>]
+ * [-outKey <i>output key class</i>]
+ * [-outValue <i>output value class</i>]
+ * [-joinOp <inner|outer|override>]
+ * [<i>in-dir</i>]* <i>in-dir</i> <i>out-dir</i>
+ */
+public class Join extends Configured implements Tool {
+
+ static int printUsage() {
+ System.out.println("join [-m <maps>] [-r <reduces>] " +
+ "[-inFormat <input format class>] " +
+ "[-outFormat <output format class>] " +
+ "[-outKey <output key class>] " +
+ "[-outValue <output value class>] " +
+ "[-joinOp <inner|outer|override>] " +
+ "[input]* <input> <output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+ /**
+ * The main driver for sort program.
+ * Invoke this method to submit the map/reduce job.
+ * @throws IOException When there is communication problems with the
+ * job tracker.
+ */
+ public int run(String[] args) throws Exception {
+ JobConf jobConf = new JobConf(getConf(), Sort.class);
+ jobConf.setJobName("join");
+
+ jobConf.setMapperClass(IdentityMapper.class);
+ jobConf.setReducerClass(IdentityReducer.class);
+
+ JobClient client = new JobClient(jobConf);
+ ClusterStatus cluster = client.getClusterStatus();
+ int num_maps = cluster.getTaskTrackers() *
+ jobConf.getInt("test.sort.maps_per_host", 10);
+ int num_reduces = cluster.getTaskTrackers() *
+ jobConf.getInt("test.sort.reduces_per_host", cluster.getMaxTasks());
+ Class<? extends InputFormat> inputFormatClass =
+ SequenceFileInputFormat.class;
+ Class<? extends OutputFormat> outputFormatClass =
+ SequenceFileOutputFormat.class;
+ Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
+ Class<? extends Writable> outputValueClass = TupleWritable.class;
+ String op = "inner";
+ List<String> otherArgs = new ArrayList<String>();
+ for(int i=0; i < args.length; ++i) {
+ try {
+ if ("-m".equals(args[i])) {
+ num_maps = Integer.parseInt(args[++i]);
+ } else if ("-r".equals(args[i])) {
+ num_reduces = Integer.parseInt(args[++i]);
+ } else if ("-inFormat".equals(args[i])) {
+ inputFormatClass =
+ Class.forName(args[++i]).asSubclass(InputFormat.class);
+ } else if ("-outFormat".equals(args[i])) {
+ outputFormatClass =
+ Class.forName(args[++i]).asSubclass(OutputFormat.class);
+ } else if ("-outKey".equals(args[i])) {
+ outputKeyClass =
+ Class.forName(args[++i]).asSubclass(WritableComparable.class);
+ } else if ("-outValue".equals(args[i])) {
+ outputValueClass =
+ Class.forName(args[++i]).asSubclass(Writable.class);
+ } else if ("-joinOp".equals(args[i])) {
+ op = args[++i];
+ } else {
+ otherArgs.add(args[i]);
+ }
+ } catch (NumberFormatException except) {
+ System.out.println("ERROR: Integer expected instead of " + args[i]);
+ return printUsage();
+ } catch (ArrayIndexOutOfBoundsException except) {
+ System.out.println("ERROR: Required parameter missing from " +
+ args[i-1]);
+ return printUsage(); // exits
+ }
+ }
+
+ // Set user-supplied (possibly default) job configs
+ jobConf.setNumMapTasks(num_maps);
+ jobConf.setNumReduceTasks(num_reduces);
+
+ if (otherArgs.size() < 2) {
+ System.out.println("ERROR: Wrong number of parameters: ");
+ return printUsage();
+ }
+
+ jobConf.setOutputPath(new Path(otherArgs.remove(otherArgs.size() - 1)));
+ List<Path> plist = new ArrayList<Path>(otherArgs.size());
+ for (String s : otherArgs) {
+ plist.add(new Path(s));
+ }
+
+ jobConf.setInputFormat(CompositeInputFormat.class);
+ jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
+ op, inputFormatClass, plist.toArray(new Path[0])));
+ jobConf.setOutputFormat(outputFormatClass);
+
+ jobConf.setOutputKeyClass(outputKeyClass);
+ jobConf.setOutputValueClass(outputValueClass);
+
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ JobClient.runJob(jobConf);
+ Date end_time = new Date();
+ System.out.println("Job ended: " + end_time);
+ System.out.println("The job took " +
+ (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new Join(), args);
+ System.exit(res);
+ }
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java?rev=602240&r1=602239&r2=602240&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java Fri Dec 7 14:01:32 2007
@@ -218,7 +218,7 @@
return new CopyInCopyOutBuffer();
}
};
-
+
/**
* Make a copy of a writable object using serialization to a buffer.
* @param orig The object to copy
@@ -226,19 +226,30 @@
*/
public static Writable clone(Writable orig, JobConf conf) {
try {
- Writable newInst = (Writable)ReflectionUtils.newInstance(orig.getClass(),
- conf);
- CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
- buffer.outBuffer.reset();
- orig.write(buffer.outBuffer);
- buffer.moveData();
- newInst.readFields(buffer.inBuffer);
+ Writable newInst =
+ (Writable)ReflectionUtils.newInstance(orig.getClass(), conf);
+ cloneInto(newInst, orig);
return newInst;
} catch (IOException e) {
throw new RuntimeException("Error writing/reading clone buffer", e);
}
}
-
+
+ /**
+ * Make a copy of the writable object using serialiation to a buffer
+ * @param dst the object to copy from
+ * @param src the object to copy into, which is destroyed
+ * @throws IOException
+ */
+ public static void cloneInto(Writable dst, Writable src) throws IOException {
+ CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
+ buffer.outBuffer.reset();
+ src.write(buffer.outBuffer);
+ buffer.moveData();
+ dst.readFields(buffer.inBuffer);
+ return;
+ }
+
/**
* Serializes an integer to a binary stream with zero-compressed encoding.
* For -120 <= i <= 127, only one byte is used with the actual value.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class provides an implementation of ResetableIterator. The
+ * implementation uses an {@link java.util.ArrayList} to store elements
+ * added to it, replaying them as requested.
+ * Prefer {@link StreamBackedIterator}.
+ */
+public class ArrayListBackedIterator<X extends Writable>
+ implements ResetableIterator<X> {
+
+ private Iterator<X> iter;
+ private ArrayList<X> data;
+ private X hold = null;
+
+ public ArrayListBackedIterator() {
+ this(new ArrayList<X>());
+ }
+
+ public ArrayListBackedIterator(ArrayList<X> data) {
+ this.data = data;
+ this.iter = this.data.iterator();
+ }
+
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean next(X val) throws IOException {
+ if (iter.hasNext()) {
+ WritableUtils.cloneInto(val, iter.next());
+ if (null == hold) {
+ hold = (X) WritableUtils.clone(val, null);
+ } else {
+ WritableUtils.cloneInto(hold, val);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public void replay(X val) throws IOException {
+ WritableUtils.cloneInto(val, hold);
+ }
+
+ public void reset() {
+ iter = data.iterator();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void add(X item) throws IOException {
+ data.add((X) WritableUtils.clone(item, null));
+ }
+
+ public void close() throws IOException {
+ iter = null;
+ data = null;
+ }
+
+ public void clear() {
+ data.clear();
+ reset();
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Refinement of InputFormat requiring implementors to provide
+ * ComposableRecordReader instead of RecordReader.
+ */
+public interface ComposableInputFormat<K extends WritableComparable,
+ V extends Writable>
+ extends InputFormat<K,V> {
+
+ ComposableRecordReader<K,V> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException;
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Additional operations required of a RecordReader to participate in a join.
+ */
+interface ComposableRecordReader<K extends WritableComparable,
+ V extends Writable>
+ extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {
+
+ /**
+ * Return the position in the collector this class occupies.
+ */
+ int id();
+
+ /**
+ * Return the key this RecordReader would supply on a call to next(K,V)
+ */
+ K key();
+
+ /**
+ * Clone the key at the head of this RecordReader into the object provided.
+ */
+ void key(K key) throws IOException;
+
+ /**
+ * Returns true if the stream is not empty, but provides no guarantee that
+ * a call to next(K,V) will succeed.
+ */
+ boolean hasNext();
+
+ /**
+ * Skip key-value pairs with keys less than or equal to the key provided.
+ */
+ void skip(K key) throws IOException;
+
+ /**
+ * While key-value pairs from this RecordReader match the given key, register
+ * them with the JoinCollector provided.
+ */
+ void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException;
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An InputFormat capable of performing joins over a set of data sources sorted
+ * and partitioned the same way.
+ * @see #setFormat
+ *
+ * A user may define new join types by setting the property
+ * <tt>mapred.join.define.<ident></tt> to a classname. In the expression
+ * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
+ * ComposableRecordReader.
+ * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
+ * in the join.
+ * @see JoinRecordReader
+ * @see MultiFilterRecordReader
+ */
+public class CompositeInputFormat<K extends WritableComparable>
+ implements ComposableInputFormat<K,TupleWritable> {
+
+ // expression parse tree to which IF requests are proxied
+ private Parser.Node root;
+
+ public CompositeInputFormat() { }
+
+
+ /**
+ * Interpret a given string as a composite expression.
+ * {@code
+ * func ::= <ident>([<func>,]*<func>)
+ * func ::= tbl(<class>,"<path>")
+ * class ::= @see java.lang.Class#forName(java.lang.String)
+ * path ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
+ * }
+ * Reads expression from the <tt>mapred.join.expr</tt> property and
+ * user-supplied join types from <tt>mapred.join.define.<ident></tt>
+ * types. Paths supplied to <tt>tbl</tt> are given as input paths to the
+ * InputFormat class listed.
+ * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
+ */
+ public void setFormat(JobConf job) throws IOException {
+ addDefaults();
+ addUserIdentifiers(job);
+ Class<? extends WritableComparator> cmpcl =
+ job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
+ root = Parser.parse(job.get("mapred.join.expr", null), cmpcl);
+ }
+
+ /**
+ * Adds the default set of identifiers to the parser.
+ */
+ protected void addDefaults() {
+ try {
+ Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
+ Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
+ Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
+ Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("FATAL: Failed to init defaults", e);
+ }
+ }
+
+ /**
+ * Inform the parser of user-defined types.
+ */
+ private void addUserIdentifiers(JobConf job) throws IOException {
+ Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$");
+ for (Map.Entry<String,String> kv : job) {
+ Matcher m = x.matcher(kv.getKey());
+ if (m.matches()) {
+ try {
+ Parser.CNode.addIdentifier(m.group(1),
+ job.getClass(m.group(0), null, ComposableRecordReader.class));
+ } catch (NoSuchMethodException e) {
+ throw (IOException)new IOException(
+ "Invalid define for " + m.group(1)).initCause(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Verify that this composite has children and that all its children
+ * can validate their input.
+ */
+ public void validateInput(JobConf job) throws IOException {
+ setFormat(job);
+ root.validateInput(job);
+ }
+
+ /**
+ * Build a CompositeInputSplit from the child InputFormats by assigning the
+ * ith split from each child to the ith composite split.
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ setFormat(job);
+ job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+ return root.getSplits(job, numSplits);
+ }
+
+ /**
+ * Construct a CompositeRecordReader for the children of this InputFormat
+ * as defined in the init expression.
+ * The outermost join need only be composable, not necessarily a composite.
+ * Mandating TupleWritable isn't strictly correct.
+ */
+ @SuppressWarnings("unchecked") // child types unknown
+ public ComposableRecordReader<K,TupleWritable> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ setFormat(job);
+ return root.getRecordReader(split, job, reporter);
+ }
+
+ /**
+ * Convenience method for constructing composite formats.
+ * Given InputFormat class (inf), path (p) return:
+ * {@code tbl(<inf>, <p>) }
+ */
+ public static String compose(Class<? extends InputFormat> inf, String path) {
+ return compose(inf.getName().intern(), path, new StringBuffer()).toString();
+ }
+
+ /**
+ * Convenience method for constructing composite formats.
+ * Given operation (op), Object class (inf), set of paths (p) return:
+ * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+ */
+ public static String compose(String op, Class<? extends InputFormat> inf,
+ String... path) {
+ final String infname = inf.getName();
+ StringBuffer ret = new StringBuffer(op + '(');
+ for (String p : path) {
+ compose(infname, p, ret);
+ ret.append(',');
+ }
+ ret.setCharAt(ret.length() - 1, ')');
+ return ret.toString();
+ }
+
+ /**
+ * Convenience method for constructing composite formats.
+ * Given operation (op), Object class (inf), set of paths (p) return:
+ * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+ */
+ public static String compose(String op, Class<? extends InputFormat> inf,
+ Path... path) {
+ ArrayList<String> tmp = new ArrayList<String>(path.length);
+ for (Path p : path) {
+ tmp.add(p.toString());
+ }
+ return compose(op, inf, tmp.toArray(new String[0]));
+ }
+
+ private static StringBuffer compose(String inf, String path,
+ StringBuffer sb) {
+ sb.append("tbl(" + inf + ",\"");
+ sb.append(path);
+ sb.append("\")");
+ return sb;
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
+ * into this collection must have a public default constructor.
+ */
+public class CompositeInputSplit implements InputSplit {
+
+ private int fill = 0;
+ private long totsize = 0L;
+ private InputSplit[] splits;
+
+ public CompositeInputSplit() { }
+
+ public CompositeInputSplit(int capacity) {
+ splits = new InputSplit[capacity];
+ }
+
+ /**
+ * Add an InputSplit to this collection.
+ * @throws IOException If capacity was not specified during construction
+ * or if capacity has been reached.
+ */
+ public void add(InputSplit s) throws IOException {
+ if (null == splits) {
+ throw new IOException("Uninitialized InputSplit");
+ }
+ if (fill == splits.length) {
+ throw new IOException("Too many splits");
+ }
+ splits[fill++] = s;
+ totsize += s.getLength();
+ }
+
+ /**
+ * Get ith child InputSplit.
+ */
+ public InputSplit get(int i) {
+ return splits[i];
+ }
+
+ /**
+ * Return the aggregate length of all child InputSplits currently added.
+ */
+ public long getLength() throws IOException {
+ return totsize;
+ }
+
+ /**
+ * Get the length of ith child InputSplit.
+ */
+ public long getLength(int i) throws IOException {
+ return splits[i].getLength();
+ }
+
+ /**
+ * Collect a set of hosts from all child InputSplits.
+ */
+ public String[] getLocations() throws IOException {
+ HashSet<String> hosts = new HashSet<String>();
+ for (InputSplit s : splits) {
+ String[] hints = s.getLocations();
+ if (hints != null && hints.length > 0) {
+ for (String host : hints) {
+ hosts.add(host);
+ }
+ }
+ }
+ return hosts.toArray(new String[hosts.size()]);
+ }
+
+ /**
+ * getLocations from ith InputSplit.
+ */
+ public String[] getLocation(int i) throws IOException {
+ return splits[i].getLocations();
+ }
+
+ /**
+ * Write splits in the following format.
+ * {@code
+ * <count><class1><class2>...<classn><split1><split2>...<splitn>
+ * }
+ */
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, splits.length);
+ for (InputSplit s : splits) {
+ Text.writeString(out, s.getClass().getName());
+ }
+ for (InputSplit s : splits) {
+ s.write(out);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws IOException If the child InputSplit cannot be read, typically
+ * for faliing access checks.
+ */
+ @SuppressWarnings("unchecked") // Explicit check for split class agreement
+ public void readFields(DataInput in) throws IOException {
+ int card = WritableUtils.readVInt(in);
+ if (splits == null || splits.length != card) {
+ splits = new InputSplit[card];
+ }
+ Class<? extends InputSplit>[] cls = new Class[card];
+ try {
+ for (int i = 0; i < card; ++i) {
+ cls[i] =
+ Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
+ }
+ for (int i = 0; i < card; ++i) {
+ splits[i] = (InputSplit) ReflectionUtils.newInstance(cls[i], null);
+ splits[i].readFields(in);
+ }
+ } catch (ClassNotFoundException e) {
+ throw (IOException)new IOException("Failed split init").initCause(e);
+ }
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,472 @@
+/** * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A RecordReader that can effect joins of RecordReaders sharing a common key
+ * type and partitioning.
+ */
+public abstract class CompositeRecordReader<
+ K extends WritableComparable, // key type
+ V extends Writable, // accepts RecordReader<K,V> as children
+ X extends Writable> // emits Writables of this type
+ implements Configurable {
+
+
+ private int id;
+ private Configuration conf;
+ private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
+
+ private WritableComparator cmp;
+ private Class<? extends WritableComparable> keyclass;
+ private PriorityQueue<ComposableRecordReader<K,?>> q;
+
+ protected final JoinCollector jc;
+ protected final ComposableRecordReader<K,? extends V>[] kids;
+
+ protected abstract boolean combine(Object[] srcs, TupleWritable value);
+
+ /**
+ * Create a RecordReader with <tt>capacity</tt> children to position
+ * <tt>id</tt> in the parent reader.
+ * The id of a root CompositeRecordReader is -1 by convention, but relying
+ * on this is not recommended.
+ */
+ @SuppressWarnings("unchecked") // Generic array assignment
+ public CompositeRecordReader(int id, int capacity,
+ Class<? extends WritableComparator> cmpcl)
+ throws IOException {
+ assert capacity > 0 : "Invalid capacity";
+ this.id = id;
+ if (null != cmpcl) {
+ cmp = (WritableComparator)ReflectionUtils.newInstance(cmpcl, null);
+ q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+ new Comparator<ComposableRecordReader<K,?>>() {
+ public int compare(ComposableRecordReader<K,?> o1,
+ ComposableRecordReader<K,?> o2) {
+ return cmp.compare(o1.key(), o2.key());
+ }
+ });
+ }
+ jc = new JoinCollector(capacity);
+ kids = new ComposableRecordReader[capacity];
+ }
+
+ /**
+ * Return the position in the collector this class occupies.
+ */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Return sorted list of RecordReaders for this composite.
+ */
+ protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
+ return q;
+ }
+
+ /**
+ * Return comparator defining the ordering for RecordReaders in this
+ * composite.
+ */
+ protected WritableComparator getComparator() {
+ return cmp;
+ }
+
+ /**
+ * Add a RecordReader to this collection.
+ * The id() of a RecordReader determines where in the Tuple its
+ * entry will appear. Adding RecordReaders with the same id has
+ * undefined behavior.
+ */
+ public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
+ kids[rr.id()] = rr;
+ if (null == q) {
+ cmp = WritableComparator.get(rr.createKey().getClass());
+ q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+ new Comparator<ComposableRecordReader<K,?>>() {
+ public int compare(ComposableRecordReader<K,?> o1,
+ ComposableRecordReader<K,?> o2) {
+ return cmp.compare(o1.key(), o2.key());
+ }
+ });
+ }
+ q.add(rr);
+ }
+
+ /**
+ * Collector for join values.
+ * This accumulates values for a given key from the child RecordReaders. If
+ * one or more child RR contain duplicate keys, this will emit the cross
+ * product of the associated values until exhausted.
+ */
+ class JoinCollector {
+ private K key;
+ private ResetableIterator<X>[] iters;
+ private long partial = 0L;
+ private long replaymask = 0L;
+ private int start = 0;
+ private int pos = -1;
+ private int iterpos = -1;
+ private boolean first = true;
+
+ /**
+ * Construct a collector capable of handling the specified number of
+ * children.
+ */
+ @SuppressWarnings("unchecked") // Generic array assignment
+ public JoinCollector(int card) {
+ iters = new ResetableIterator[card];
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i] = EMPTY;
+ }
+ }
+
+ /**
+ * Register a given iterator at position id.
+ */
+ public void add(int id, ResetableIterator<X> i)
+ throws IOException {
+ iters[id] = i;
+ }
+
+ /**
+ * Return the key associated with this collection.
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Codify the contents of the collector to be iterated over.
+ * When this is called, all RecordReaders registered for this
+ * key should have added ResetableIterators.
+ */
+ public void reset(K key) {
+ this.key = key;
+ start = 0;
+ pos = 0;
+ first = true;
+ partial = 0L;
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i].reset();
+ }
+ }
+
+ /**
+ * Clear all state information.
+ */
+ public void clear() {
+ key = null;
+ pos = -1;
+ first = true;
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i].clear();
+ iters[i] = EMPTY;
+ }
+ partial = 0L;
+ }
+
+ /**
+ * Returns false if exhausted or if reset(K) has not been called.
+ */
+ protected boolean hasNext() {
+ return !(pos < 0);
+ }
+
+ /**
+ * Populate Tuple from iterators.
+ * It should be the case that, given iterators i_1...i_n over values from
+ * sources s_1...s_n sharing key k, repeated calls to next should yield
+ * I x I.
+ */
+ @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+ protected boolean next(TupleWritable val) throws IOException {
+ if (pos < 0) {
+ clear();
+ return false;
+ }
+ int i = start;
+ if (first) { // Find first iterator with elements
+ for (; i < iters.length && !iters[i].hasNext(); ++i);
+ if (iters.length <= i) { // no children had key
+ clear();
+ return false;
+ }
+ start = i;
+ for (int j = i; j < iters.length; ++j) {
+ if (iters[j].hasNext()) {
+ partial |= 1 << j;
+ }
+ }
+ iterpos = pos = iters.length - 1;
+ first = false;
+ } else { // Copy all elements in partial into tuple
+ for (; i < iterpos; ++i) {
+ if ((partial & (1 << i)) != 0) {
+ iters[i].replay((X)val.get(i));
+ val.setWritten(i);
+ }
+ }
+ }
+ long partialwritten = val.mask();
+ if (iters[i].next((X)val.get(i))) {
+ val.setWritten(i);
+ }
+ for (++i; i < iters.length; ++i) {
+ iters[i].reset();
+ if (iters[i].hasNext() && iters[i].next((X)val.get(i))) {
+ val.setWritten(i);
+ }
+ }
+ iterpos = iters.length - 1;
+ for (; iterpos > pos && !iters[iterpos].hasNext(); --iterpos);
+ if (!iters[iterpos].hasNext()) {
+ for (; !(pos < 0 || iters[pos].hasNext()); --pos);
+ iterpos = pos;
+ }
+ replaymask = val.mask();
+ if ((replaymask ^ partialwritten) == 0L) {
+ return next(val);
+ }
+ return true;
+ }
+
+ /**
+ * Replay the last Tuple emitted.
+ */
+ @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+ public void replay(TupleWritable val) throws IOException {
+ // The last emitted tuple might have drawn on an empty source;
+ // it can't be cleared prematurely, b/c there may be more duplicate
+ // keys in iterator positions < pos
+ if (first) {
+ throw new IllegalStateException();
+ }
+ for (int i = 0; i < iters.length; ++i) {
+ if ((replaymask & (1 << i)) != 0) {
+ iters[i].replay((X)val.get(i));
+ }
+ }
+ }
+
+ /**
+ * Close all child iterators.
+ */
+ public void close() throws IOException {
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i].close();
+ }
+ }
+
+ /**
+ * Write the next value into key, value as accepted by the operation
+ * associated with this set of RecordReaders.
+ */
+ public boolean flush(TupleWritable value) throws IOException {
+ while (hasNext()) {
+ value.clearWritten();
+ if (next(value) && combine(kids, value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Return the key for the current join or the value at the top of the
+ * RecordReader heap.
+ */
+ public K key() {
+ if (jc.hasNext()) {
+ return jc.key();
+ }
+ if (!q.isEmpty()) {
+ return q.peek().key();
+ }
+ return null;
+ }
+
+ /**
+ * Clone the key at the top of this RR into the given object.
+ */
+ public void key(K key) throws IOException {
+ WritableUtils.cloneInto(key, key());
+ }
+
+ /**
+ * Return true if it is possible that this could emit more values.
+ */
+ public boolean hasNext() {
+ return jc.hasNext() || !q.isEmpty();
+ }
+
+ /**
+ * Pass skip key to child RRs.
+ */
+ public void skip(K key) throws IOException {
+ ArrayList<ComposableRecordReader<K,?>> tmp =
+ new ArrayList<ComposableRecordReader<K,?>>();
+ while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
+ tmp.add(q.poll());
+ }
+ for (ComposableRecordReader<K,?> rr : tmp) {
+ rr.skip(key);
+ q.add(rr);
+ }
+ }
+
+ /**
+ * Obtain an iterator over the child RRs apropos of the value type
+ * ultimately emitted from this join.
+ */
+ protected abstract ResetableIterator<X> getDelegate();
+
+ /**
+ * If key provided matches that of this Composite, give JoinCollector
+ * iterator over values it may emit.
+ */
+ @SuppressWarnings("unchecked") // No values from static EMPTY class
+ public void accept(CompositeRecordReader.JoinCollector jc, K key)
+ throws IOException {
+ if (hasNext() && 0 == cmp.compare(key, key())) {
+ fillJoinCollector(createKey());
+ jc.add(id, getDelegate());
+ return;
+ }
+ jc.add(id, EMPTY);
+ }
+
+ /**
+ * For all child RRs offering the key provided, obtain an iterator
+ * at that position in the JoinCollector.
+ */
+ protected void fillJoinCollector(K iterkey) throws IOException {
+ if (!q.isEmpty()) {
+ q.peek().key(iterkey);
+ while (0 == cmp.compare(q.peek().key(), iterkey)) {
+ ComposableRecordReader<K,?> t = q.poll();
+ t.accept(jc, iterkey);
+ if (t.hasNext()) {
+ q.add(t);
+ } else if (q.isEmpty()) {
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Implement Comparable contract (compare key of join or head of heap
+ * with that of another).
+ */
+ public int compareTo(ComposableRecordReader<K,?> other) {
+ return cmp.compare(key(), other.key());
+ }
+
+ /**
+ * Create a new key value common to all child RRs.
+ * @throws ClassCastException if key classes differ.
+ */
+ @SuppressWarnings("unchecked") // Explicit check for key class agreement
+ public K createKey() {
+ if (null == keyclass) {
+ final Class<?> cls = kids[0].createKey().getClass();
+ for (RecordReader<K,? extends Writable> rr : kids) {
+ if (!cls.equals(rr.createKey().getClass())) {
+ throw new ClassCastException("Child key classes fail to agree");
+ }
+ }
+ keyclass = cls.asSubclass(WritableComparable.class);
+ }
+ return (K) ReflectionUtils.newInstance(keyclass, getConf());
+ }
+
+ /**
+ * Create a value to be used internally for joins.
+ */
+ protected TupleWritable createInternalValue() {
+ Writable[] vals = new Writable[kids.length];
+ for (int i = 0; i < vals.length; ++i) {
+ vals[i] = kids[i].createValue();
+ }
+ return new TupleWritable(vals);
+ }
+
+ /**
+ * Unsupported (returns zero in all cases).
+ */
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ /**
+ * Close all child RRs.
+ */
+ public void close() throws IOException {
+ if (kids != null) {
+ for (RecordReader<K,? extends Writable> rr : kids) {
+ rr.close();
+ }
+ }
+ if (jc != null) {
+ jc.close();
+ }
+ }
+
+ /**
+ * Report progress as the minimum of all child RR progress.
+ */
+ public float getProgress() throws IOException {
+ float ret = 1.0f;
+ for (RecordReader<K,? extends Writable> rr : kids) {
+ ret = Math.min(ret, rr.getProgress());
+ }
+ return ret;
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Full inner join.
+ */
+public class InnerJoinRecordReader<K extends WritableComparable>
+ extends JoinRecordReader<K> {
+
+ InnerJoinRecordReader(int id, JobConf conf, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, conf, capacity, cmpcl);
+ }
+
+ /**
+ * Return true iff the tuple is full (all data sources contain this key).
+ */
+ protected boolean combine(Object[] srcs, TupleWritable dst) {
+ assert srcs.length == dst.size();
+ for (int i = 0; i < srcs.length; ++i) {
+ if (!dst.has(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Base class for Composite joins returning Tuples of arbitrary Writables.
+ */
+public abstract class JoinRecordReader<K extends WritableComparable>
+ extends CompositeRecordReader<K,Writable,TupleWritable>
+ implements ComposableRecordReader<K,TupleWritable> {
+
+ public JoinRecordReader(int id, JobConf conf, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, capacity, cmpcl);
+ setConf(conf);
+ }
+
+ /**
+ * Emit the next set of key, value pairs as defined by the child
+ * RecordReaders and operation associated with this composite RR.
+ */
+ public boolean next(K key, TupleWritable value) throws IOException {
+ if (jc.flush(value)) {
+ WritableUtils.cloneInto(key, jc.key());
+ return true;
+ }
+ jc.clear();
+ K iterkey = createKey();
+ final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+ while (!q.isEmpty()) {
+ fillJoinCollector(iterkey);
+ jc.reset(iterkey);
+ if (jc.flush(value)) {
+ WritableUtils.cloneInto(key, jc.key());
+ return true;
+ }
+ jc.clear();
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ public TupleWritable createValue() {
+ return createInternalValue();
+ }
+
+ /**
+ * Return an iterator wrapping the JoinCollector.
+ */
+ protected ResetableIterator<TupleWritable> getDelegate() {
+ return new JoinDelegationIterator();
+ }
+
+ /**
+ * Since the JoinCollector is effecting our operation, we need only
+ * provide an iterator proxy wrapping its operation.
+ */
+ protected class JoinDelegationIterator
+ implements ResetableIterator<TupleWritable> {
+
+ public boolean hasNext() {
+ return jc.hasNext();
+ }
+
+ public boolean next(TupleWritable val) throws IOException {
+ return jc.flush(val);
+ }
+
+ public void replay(TupleWritable val) throws IOException {
+ jc.replay(val);
+ }
+
+ public void reset() {
+ jc.reset(jc.key());
+ }
+
+ public void add(TupleWritable item) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException {
+ jc.close();
+ }
+
+ public void clear() {
+ jc.clear();
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Base class for Composite join returning values derived from multiple
+ * sources, but generally not tuples.
+ */
+public abstract class MultiFilterRecordReader<K extends WritableComparable,
+ V extends Writable>
+ extends CompositeRecordReader<K,V,V>
+ implements ComposableRecordReader<K,V> {
+
+ private Class<? extends Writable> valueclass;
+ private TupleWritable ivalue;
+
+ public MultiFilterRecordReader(int id, JobConf conf, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, capacity, cmpcl);
+ setConf(conf);
+ }
+
+ /**
+ * For each tuple emitted, return a value (typically one of the values
+ * in the tuple).
+ * Modifying the Writables in the tuple is permitted and unlikely to affect
+ * join behavior in most cases, but it is not recommended. It's safer to
+ * clone first.
+ */
+ protected abstract V emit(TupleWritable dst) throws IOException;
+
+ /**
+ * Default implementation offers {@link #emit} every Tuple from the
+ * collector (the outer join of child RRs).
+ */
+ protected boolean combine(Object[] srcs, TupleWritable dst) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(K key, V value) throws IOException {
+ if (jc.flush(ivalue)) {
+ WritableUtils.cloneInto(key, jc.key());
+ WritableUtils.cloneInto(value, emit(ivalue));
+ return true;
+ }
+ jc.clear();
+ K iterkey = createKey();
+ final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+ while (!q.isEmpty()) {
+ fillJoinCollector(iterkey);
+ jc.reset(iterkey);
+ if (jc.flush(ivalue)) {
+ WritableUtils.cloneInto(key, jc.key());
+ WritableUtils.cloneInto(value, emit(ivalue));
+ return true;
+ }
+ jc.clear();
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked") // Explicit check for value class agreement
+ public V createValue() {
+ if (null == valueclass) {
+ final Class<?> cls = kids[0].createValue().getClass();
+ for (RecordReader<K,? extends V> rr : kids) {
+ if (!cls.equals(rr.createValue().getClass())) {
+ throw new ClassCastException("Child value classes fail to agree");
+ }
+ }
+ valueclass = cls.asSubclass(Writable.class);
+ ivalue = createInternalValue();
+ }
+ return (V) ReflectionUtils.newInstance(valueclass, null);
+ }
+
+ /**
+ * Return an iterator returning a single value from the tuple.
+ * @see MultiFilterDelegationIterator
+ */
+ protected ResetableIterator<V> getDelegate() {
+ return new MultiFilterDelegationIterator();
+ }
+
+ /**
+ * Proxy the JoinCollector, but include callback to emit.
+ */
+ protected class MultiFilterDelegationIterator
+ implements ResetableIterator<V> {
+
+ public boolean hasNext() {
+ return jc.hasNext();
+ }
+
+ public boolean next(V val) throws IOException {
+ boolean ret;
+ if (ret = jc.flush(ivalue)) {
+ WritableUtils.cloneInto(val, emit(ivalue));
+ }
+ return ret;
+ }
+
+ public void replay(V val) throws IOException {
+ WritableUtils.cloneInto(val, emit(ivalue));
+ }
+
+ public void reset() {
+ jc.reset(jc.key());
+ }
+
+ public void add(V item) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException {
+ jc.close();
+ }
+
+ public void clear() {
+ jc.clear();
+ }
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Full outer join.
+ */
+public class OuterJoinRecordReader<K extends WritableComparable>
+ extends JoinRecordReader<K> {
+
+ OuterJoinRecordReader(int id, JobConf conf, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, conf, capacity, cmpcl);
+ }
+
+ /**
+ * Emit everything from the collector.
+ */
+ protected boolean combine(Object[] srcs, TupleWritable dst) {
+ assert srcs.length == dst.size();
+ return true;
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Prefer the "rightmost" data source for this key.
+ * For example, <tt>override(S1,S2,S3)</tt> will prefer values
+ * from S3 over S2, and values from S2 over S1 for all keys
+ * emitted from all sources.
+ */
+public class OverrideRecordReader<K extends WritableComparable,
+ V extends Writable>
+ extends MultiFilterRecordReader<K,V> {
+
+ OverrideRecordReader(int id, JobConf conf, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, conf, capacity, cmpcl);
+ }
+
+ /**
+ * Emit the value with the highest position in the tuple.
+ */
+ @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+ protected V emit(TupleWritable dst) {
+ return (V) dst.iterator().next();
+ }
+
+ /**
+ * Instead of filling the JoinCollector with iterators from all
+ * data sources, fill only the rightmost for this key.
+ * This not only saves space by discarding the other sources, but
+ * it also emits the number of key-value pairs in the preferred
+ * RecordReader instead of repeating that stream n times, where
+ * n is the cardinality of the cross product of the discarded
+ * streams for the given key.
+ */
+ protected void fillJoinCollector(K iterkey) throws IOException {
+ final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+ if (!q.isEmpty()) {
+ int highpos = -1;
+ ArrayList<ComposableRecordReader<K,?>> list =
+ new ArrayList<ComposableRecordReader<K,?>>(kids.length);
+ q.peek().key(iterkey);
+ final WritableComparator cmp = getComparator();
+ while (0 == cmp.compare(q.peek().key(), iterkey)) {
+ ComposableRecordReader<K,?> t = q.poll();
+ if (-1 == highpos || list.get(highpos).id() < t.id()) {
+ highpos = list.size();
+ }
+ list.add(t);
+ if (q.isEmpty())
+ break;
+ }
+ ComposableRecordReader<K,?> t = list.remove(highpos);
+ t.accept(jc, iterkey);
+ for (ComposableRecordReader<K,?> rr : list) {
+ rr.skip(iterkey);
+ }
+ list.add(t);
+ for (ComposableRecordReader<K,?> rr : list) {
+ if (rr.hasNext()) {
+ q.add(rr);
+ }
+ }
+ }
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.CharArrayReader;
+import java.io.IOException;
+import java.io.StreamTokenizer;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Very simple shift-reduce parser for join expressions.
+ *
+ * This should be sufficient for the user extension permitted now, but ought to
+ * be replaced with a parser generator if more complex grammars are supported.
+ * In particular, this "shift-reduce" parser has no states. Each set
+ * of formals requires a different internal node type, which is responsible for
+ * interpreting the list of tokens it receives. This is sufficient for the
+ * current grammar, but it has several annoying properties that might inhibit
+ * extension. In particular, parenthesis are always function calls; an
+ * algebraic or filter grammar would not only require a node type, but must
+ * also work around the internals of this parser.
+ *
+ * For most other cases, adding classes to the hierarchy- particularly by
+ * extending JoinRecordReader and MultiFilterRecordReader- is fairly
+ * straightforward. One need only override the relevant method(s) (usually only
+ * {@link CompositeRecordReader#combine}) and include a property to map its
+ * value to an identifier in the parser.
+ */
+public class Parser {
+ public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
+
+ /**
+ * Tagged-union type for tokens from the join expression.
+ * @see Parser.TType
+ */
+ public static class Token {
+
+ private TType type;
+
+ Token(TType type) {
+ this.type = type;
+ }
+
+ public TType getType() { return type; }
+ public Node getNode() throws IOException {
+ throw new IOException("Expected nodetype");
+ }
+ public double getNum() throws IOException {
+ throw new IOException("Expected numtype");
+ }
+ public String getStr() throws IOException {
+ throw new IOException("Expected strtype");
+ }
+ }
+
+ public static class NumToken extends Token {
+ private double num;
+ public NumToken(double num) {
+ super(TType.NUM);
+ this.num = num;
+ }
+ public double getNum() { return num; }
+ }
+
+ public static class NodeToken extends Token {
+ private Node node;
+ NodeToken(Node node) {
+ super(TType.CIF);
+ this.node = node;
+ }
+ public Node getNode() {
+ return node;
+ }
+ }
+
+ public static class StrToken extends Token {
+ private String str;
+ public StrToken(TType type, String str) {
+ super(type);
+ this.str = str;
+ }
+ public String getStr() {
+ return str;
+ }
+ }
+
+ /**
+ * Simple lexer wrapping a StreamTokenizer.
+ * This encapsulates the creation of tagged-union Tokens and initializes the
+ * SteamTokenizer.
+ */
+ private static class Lexer {
+
+ private StreamTokenizer tok;
+
+ Lexer(String s) {
+ tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
+ tok.quoteChar('"');
+ tok.parseNumbers();
+ tok.ordinaryChar(',');
+ tok.ordinaryChar('(');
+ tok.ordinaryChar(')');
+ }
+
+ Token next() throws IOException {
+ int type = tok.nextToken();
+ switch (type) {
+ case StreamTokenizer.TT_EOF:
+ case StreamTokenizer.TT_EOL:
+ return null;
+ case StreamTokenizer.TT_NUMBER:
+ return new NumToken(tok.nval);
+ case StreamTokenizer.TT_WORD:
+ return new StrToken(TType.IDENT, tok.sval);
+ case '"':
+ return new StrToken(TType.QUOT, tok.sval);
+ default:
+ switch (type) {
+ case ',':
+ return new Token(TType.COMMA);
+ case '(':
+ return new Token(TType.LPAREN);
+ case ')':
+ return new Token(TType.RPAREN);
+ default:
+ throw new IOException("Unexpected: " + type);
+ }
+ }
+ }
+ }
+
+ public abstract static class Node implements ComposableInputFormat {
+ /**
+ * Return the node type registered for the particular identifier.
+ * By default, this is a CNode for any composite node and a WNode
+ * for "wrapped" nodes. User nodes will likely be composite
+ * nodes.
+ * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
+ * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
+ */
+ static Node forIdent(String ident) throws IOException {
+ try {
+ if (!nodeCstrMap.containsKey(ident)) {
+ throw new IOException("No nodetype for " + ident);
+ }
+ return nodeCstrMap.get(ident).newInstance(ident);
+ } catch (IllegalAccessException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (InvocationTargetException e) {
+ throw (IOException)new IOException().initCause(e);
+ }
+ }
+
+ private static final Class<?>[] ncstrSig = { String.class };
+ private static final
+ Map<String,Constructor<? extends Node>> nodeCstrMap =
+ new HashMap<String,Constructor<? extends Node>>();
+ protected static final
+ Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
+ new HashMap<String,Constructor<? extends ComposableRecordReader>>();
+
+ /**
+ * For a given identifier, add a mapping to the nodetype for the parse
+ * tree and to the ComposableRecordReader to be created, including the
+ * formals required to invoke the constructor.
+ * The nodetype and constructor signature should be filled in from the
+ * child node.
+ */
+ protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
+ Class<? extends Node> nodetype,
+ Class<? extends ComposableRecordReader> cl)
+ throws NoSuchMethodException {
+ Constructor<? extends Node> ncstr =
+ nodetype.getDeclaredConstructor(ncstrSig);
+ ncstr.setAccessible(true);
+ nodeCstrMap.put(ident, ncstr);
+ Constructor<? extends ComposableRecordReader> mcstr =
+ cl.getDeclaredConstructor(mcstrSig);
+ mcstr.setAccessible(true);
+ rrCstrMap.put(ident, mcstr);
+ }
+
+ // inst
+ protected int id = -1;
+ protected String ident;
+ protected Class<? extends WritableComparator> cmpcl;
+
+ protected Node(String ident) {
+ this.ident = ident;
+ }
+
+ protected void setID(int id) {
+ this.id = id;
+ }
+
+ protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+ this.cmpcl = cmpcl;
+ }
+ abstract void parse(List<Token> args) throws IOException;
+ }
+
+ /**
+ * Nodetype in the parse tree for "wrapped" InputFormats.
+ */
+ static class WNode extends Node {
+ private static final Class<?>[] cstrSig =
+ { Integer.TYPE, RecordReader.class, Class.class };
+
+ static void addIdentifier(String ident,
+ Class<? extends ComposableRecordReader> cl)
+ throws NoSuchMethodException {
+ Node.addIdentifier(ident, cstrSig, WNode.class, cl);
+ }
+
+ private String indir;
+ private InputFormat inf;
+
+ public WNode(String ident) {
+ super(ident);
+ }
+
+ /**
+ * Let the first actual define the InputFormat and the second define
+ * the <tt>mapred.input.dir</tt> property.
+ */
+ public void parse(List<Token> ll) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ Iterator<Token> i = ll.iterator();
+ while (i.hasNext()) {
+ Token t = i.next();
+ if (TType.COMMA.equals(t.getType())) {
+ try {
+ inf = (InputFormat)ReflectionUtils.newInstance(
+ Class.forName(sb.toString()).asSubclass(InputFormat.class),
+ null);
+ } catch (ClassNotFoundException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (IllegalArgumentException e) {
+ throw (IOException)new IOException().initCause(e);
+ }
+ break;
+ }
+ sb.append(t.getStr());
+ }
+ if (!i.hasNext()) {
+ throw new IOException("Parse error");
+ }
+ Token t = i.next();
+ if (!TType.QUOT.equals(t.getType())) {
+ throw new IOException("Expected quoted string");
+ }
+ indir = t.getStr();
+ // no check for ll.isEmpty() to permit extension
+ }
+
+ private JobConf getConf(JobConf job) {
+ JobConf conf = new JobConf(job);
+ conf.setInputPath(new Path(indir));
+ return conf;
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ inf.validateInput(getConf(job));
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ return inf.getSplits(getConf(job), numSplits);
+ }
+
+ public ComposableRecordReader getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ try {
+ if (!rrCstrMap.containsKey(ident)) {
+ throw new IOException("No RecordReader for " + ident);
+ }
+ return rrCstrMap.get(ident).newInstance(id,
+ inf.getRecordReader(split, getConf(job), reporter), cmpcl);
+ } catch (IllegalAccessException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (InvocationTargetException e) {
+ throw (IOException)new IOException().initCause(e);
+ }
+ }
+
+ public String toString() {
+ return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
+ }
+ }
+
+ /**
+ * Internal nodetype for "composite" InputFormats.
+ */
+ static class CNode extends Node {
+
+ private static final Class<?>[] cstrSig =
+ { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
+
+ static void addIdentifier(String ident,
+ Class<? extends ComposableRecordReader> cl)
+ throws NoSuchMethodException {
+ Node.addIdentifier(ident, cstrSig, CNode.class, cl);
+ }
+
+ // inst
+ private ArrayList<Node> kids = new ArrayList<Node>();
+
+ public CNode(String ident) {
+ super(ident);
+ }
+
+ public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+ super.setKeyComparator(cmpcl);
+ for (Node n : kids) {
+ n.setKeyComparator(cmpcl);
+ }
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ if (0 == kids.size()) {
+ throw new IOException("Childless composite");
+ }
+ for (Node n : kids) {
+ n.validateInput(job);
+ }
+ }
+
+ /**
+ * Combine InputSplits from child InputFormats into a
+ * {@link CompositeInputSplit}.
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ InputSplit[][] splits = new InputSplit[kids.size()][];
+ for (int i = 0; i < kids.size(); ++i) {
+ final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
+ if (null == tmp) {
+ throw new IOException("Error gathering splits from child RReader");
+ }
+ if (i > 0 && splits[i-1].length != tmp.length) {
+ throw new IOException("Inconsistent split cardinality from child");
+ }
+ splits[i] = tmp;
+ }
+ final int size = splits[0].length;
+ CompositeInputSplit[] ret = new CompositeInputSplit[size];
+ for (int i = 0; i < size; ++i) {
+ ret[i] = new CompositeInputSplit(splits.length);
+ for (int j = 0; j < splits.length; ++j) {
+ ret[i].add(splits[j][i]);
+ }
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("unchecked") // child types unknowable
+ public ComposableRecordReader getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ if (!(split instanceof CompositeInputSplit)) {
+ throw new IOException("Invalid split type:" +
+ split.getClass().getName());
+ }
+ final CompositeInputSplit spl = (CompositeInputSplit)split;
+ final int capacity = kids.size();
+ CompositeRecordReader ret = null;
+ try {
+ if (!rrCstrMap.containsKey(ident)) {
+ throw new IOException("No RecordReader for " + ident);
+ }
+ ret = (CompositeRecordReader)
+ rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
+ } catch (IllegalAccessException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (InvocationTargetException e) {
+ throw (IOException)new IOException().initCause(e);
+ }
+ for (int i = 0; i < capacity; ++i) {
+ ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
+ }
+ return (ComposableRecordReader)ret;
+ }
+
+ /**
+ * Parse a list of comma-separated nodes.
+ */
+ public void parse(List<Token> args) throws IOException {
+ ListIterator<Token> i = args.listIterator();
+ while (i.hasNext()) {
+ Token t = i.next();
+ t.getNode().setID(i.previousIndex() >> 1);
+ kids.add(t.getNode());
+ if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
+ throw new IOException("Expected ','");
+ }
+ }
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ident + "(");
+ for (Node n : kids) {
+ sb.append(n.toString() + ",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');
+ return sb.toString();
+ }
+ }
+
+ private static Token reduce(Stack<Token> st) throws IOException {
+ LinkedList<Token> args = new LinkedList<Token>();
+ while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
+ args.addFirst(st.pop());
+ }
+ if (st.isEmpty()) {
+ throw new IOException("Unmatched ')'");
+ }
+ st.pop();
+ if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
+ throw new IOException("Identifier expected");
+ }
+ Node n = Node.forIdent(st.pop().getStr());
+ n.parse(args);
+ return new NodeToken(n);
+ }
+
+ /**
+ * Given an expression and an optional comparator, build a tree of
+ * InputFormats using the comparator to sort keys.
+ */
+ static Node parse(String expr, Class<? extends WritableComparator> cmpcl)
+ throws IOException {
+ if (null == expr) {
+ throw new IOException("Expression is null");
+ }
+ Lexer lex = new Lexer(expr);
+ Stack<Token> st = new Stack<Token>();
+ Token tok;
+ while ((tok = lex.next()) != null) {
+ if (TType.RPAREN.equals(tok.getType())) {
+ st.push(reduce(st));
+ } else {
+ st.push(tok);
+ }
+ }
+ if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
+ Node ret = st.pop().getNode();
+ if (cmpcl != null) {
+ ret.setKeyComparator(cmpcl);
+ }
+ return ret;
+ }
+ throw new IOException("Missing ')'");
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This defines an interface to a stateful Iterator that can replay elements
+ * added to it directly.
+ * Note that this does not extend {@link java.util.Iterator}.
+ */
+public interface ResetableIterator<T extends Writable> {
+
+ public static class EMPTY<U extends Writable>
+ implements ResetableIterator<U> {
+ public boolean hasNext() { return false; }
+ public void reset() { }
+ public void close() throws IOException { }
+ public void clear() { }
+ public boolean next(U val) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ public void replay(U val) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ public void add(U item) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * True iff a call to next will succeed.
+ */
+ public boolean hasNext();
+
+ /**
+ * Assign next value to actual.
+ * It is required that elements added to a ResetableIterator be returned in
+ * the same order after a call to {@link #reset} (FIFO).
+ *
+ * Note that a call to this may fail for nested joins (i.e. more elements
+ * available, but none satisfying the constraints of the join)
+ */
+ public boolean next(T val) throws IOException;
+
+ /**
+ * Assign last value returned to actual.
+ */
+ public void replay(T val) throws IOException;
+
+ /**
+ * Set iterator to return to the start of its range. Must be called after
+ * calling {@link #add} to avoid a ConcurrentModificationException.
+ */
+ public void reset();
+
+ /**
+ * Add an element to the collection of elements to iterate over.
+ */
+ public void add(T item) throws IOException;
+
+ /**
+ * Close datasources and release resources. Calling methods on the iterator
+ * after calling close has undefined behavior.
+ */
+ // XXX is this necessary?
+ public void close() throws IOException;
+
+ /**
+ * Close datasources, but do not release internal resources. Calling this
+ * method should permit the object to be reused with a different datasource.
+ */
+ public void clear();
+
+}