You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/08/24 08:19:23 UTC
svn commit: r807096 [1/2] - in /hadoop/mapreduce/trunk: ./
src/examples/org/apache/hadoop/examples/
src/java/org/apache/hadoop/mapred/join/
src/java/org/apache/hadoop/mapreduce/lib/join/ src/test/
src/test/mapred/org/apache/hadoop/mapreduce/ src/test/m...
Author: cdouglas
Date: Mon Aug 24 06:19:21 2009
New Revision: 807096
URL: http://svn.apache.org/viewvc?rev=807096&view=rev
Log:
MAPREDUCE-355. Update mapred.join package to use the new API. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ResetableIterator.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/StreamBackedIterator.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/TupleWritable.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/package.html
- copied, changed from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/package.html
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinTupleWritable.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestTupleWritable.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java
- copied, changed from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestWrappedRecordReaderClassloader.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 24 06:19:21 2009
@@ -243,6 +243,9 @@
add mapred.reduce.child.log.level
(acmurthy)
+ MAPREDUCE-355. Update mapred.join package to use the new API. (Amareshwari
+ Sriramadasu via cdouglas)
+
BUG FIXES
MAPREDUCE-878. Rename fair scheduler design doc to
Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Join.java Mon Aug 24 06:19:21 2009
@@ -27,19 +27,23 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.join.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.join.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
- * This is the trivial map/reduce program that does absolutely nothing
- * other than use the framework to fragment and sort the input values.
+ * Given a set of sorted datasets keyed with the same class and yielding
+ * equal partitions, it is possible to effect a join of those datasets
+ * prior to the map. The example facilitates the same.
*
* To run: bin/hadoop jar build/hadoop-examples.jar join
- * [-m <i>maps</i>] [-r <i>reduces</i>]
+ * [-r <i>reduces</i>]
* [-inFormat <i>input format class</i>]
* [-outFormat <i>output format class</i>]
* [-outKey <i>output key class</i>]
@@ -50,7 +54,7 @@
public class Join extends Configured implements Tool {
static int printUsage() {
- System.out.println("join [-m <maps>] [-r <reduces>] " +
+ System.out.println("join [-r <reduces>] " +
"[-inFormat <input format class>] " +
"[-outFormat <output format class>] " +
"[-outKey <output key class>] " +
@@ -58,7 +62,7 @@
"[-joinOp <inner|outer|override>] " +
"[input]* <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
- return -1;
+ return 2;
}
/**
@@ -67,23 +71,24 @@
* @throws IOException When there is communication problems with the
* job tracker.
*/
+ @SuppressWarnings("unchecked")
public int run(String[] args) throws Exception {
- JobConf jobConf = new JobConf(getConf(), Sort.class);
- jobConf.setJobName("join");
-
- jobConf.setMapperClass(IdentityMapper.class);
- jobConf.setReducerClass(IdentityReducer.class);
-
- JobClient client = new JobClient(jobConf);
+ Configuration conf = getConf();
+ JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
- int num_maps = cluster.getTaskTrackers() *
- jobConf.getInt("test.sort.maps_per_host", 10);
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
- String sort_reduces = jobConf.get("test.sort.reduces_per_host");
- if (sort_reduces != null) {
+ String join_reduces = conf.get("mapreduce.join.reduces_per_host");
+ if (join_reduces != null) {
num_reduces = cluster.getTaskTrackers() *
- Integer.parseInt(sort_reduces);
+ Integer.parseInt(join_reduces);
}
+ Job job = new Job(conf);
+ job.setJobName("join");
+ job.setJarByClass(Sort.class);
+
+ job.setMapperClass(Mapper.class);
+ job.setReducerClass(Reducer.class);
+
Class<? extends InputFormat> inputFormatClass =
SequenceFileInputFormat.class;
Class<? extends OutputFormat> outputFormatClass =
@@ -94,9 +99,7 @@
List<String> otherArgs = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
- if ("-m".equals(args[i])) {
- num_maps = Integer.parseInt(args[++i]);
- } else if ("-r".equals(args[i])) {
+ if ("-r".equals(args[i])) {
num_reduces = Integer.parseInt(args[++i]);
} else if ("-inFormat".equals(args[i])) {
inputFormatClass =
@@ -126,37 +129,37 @@
}
// Set user-supplied (possibly default) job configs
- jobConf.setNumMapTasks(num_maps);
- jobConf.setNumReduceTasks(num_reduces);
+ job.setNumReduceTasks(num_reduces);
if (otherArgs.size() < 2) {
System.out.println("ERROR: Wrong number of parameters: ");
return printUsage();
}
- FileOutputFormat.setOutputPath(jobConf,
+ FileOutputFormat.setOutputPath(job,
new Path(otherArgs.remove(otherArgs.size() - 1)));
List<Path> plist = new ArrayList<Path>(otherArgs.size());
for (String s : otherArgs) {
plist.add(new Path(s));
}
- jobConf.setInputFormat(CompositeInputFormat.class);
- jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
- op, inputFormatClass, plist.toArray(new Path[0])));
- jobConf.setOutputFormat(outputFormatClass);
+ job.setInputFormatClass(CompositeInputFormat.class);
+ job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
+ CompositeInputFormat.compose(op, inputFormatClass,
+ plist.toArray(new Path[0])));
+ job.setOutputFormatClass(outputFormatClass);
- jobConf.setOutputKeyClass(outputKeyClass);
- jobConf.setOutputValueClass(outputValueClass);
+ job.setOutputKeyClass(outputKeyClass);
+ job.setOutputValueClass(outputValueClass);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
- JobClient.runJob(jobConf);
+ int ret = job.waitForCompletion(true) ? 0 : 1 ;
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
- return 0;
+ return ret;
}
public static void main(String[] args) throws Exception {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java Mon Aug 24 06:19:21 2009
@@ -17,73 +17,28 @@
*/
package org.apache.hadoop.mapred.join;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
/**
* This class provides an implementation of ResetableIterator. The
* implementation uses an {@link java.util.ArrayList} to store elements
* added to it, replaying them as requested.
* Prefer {@link StreamBackedIterator}.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.ArrayListBackedIterator} instead
*/
-public class ArrayListBackedIterator<X extends Writable>
+@Deprecated
+public class ArrayListBackedIterator<X extends Writable> extends
+ org.apache.hadoop.mapreduce.lib.join.ArrayListBackedIterator<X>
implements ResetableIterator<X> {
- private Iterator<X> iter;
- private ArrayList<X> data;
- private X hold = null;
-
public ArrayListBackedIterator() {
- this(new ArrayList<X>());
+ super();
}
public ArrayListBackedIterator(ArrayList<X> data) {
- this.data = data;
- this.iter = this.data.iterator();
- }
-
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- public boolean next(X val) throws IOException {
- if (iter.hasNext()) {
- WritableUtils.cloneInto(val, iter.next());
- if (null == hold) {
- hold = WritableUtils.clone(val, null);
- } else {
- WritableUtils.cloneInto(hold, val);
- }
- return true;
- }
- return false;
- }
-
- public boolean replay(X val) throws IOException {
- WritableUtils.cloneInto(val, hold);
- return true;
- }
-
- public void reset() {
- iter = data.iterator();
- }
-
- public void add(X item) throws IOException {
- data.add(WritableUtils.clone(item, null));
- }
-
- public void close() throws IOException {
- iter = null;
- data = null;
- }
-
- public void clear() {
- data.clear();
- reset();
+ super(data);
}
-
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java Mon Aug 24 06:19:21 2009
@@ -30,7 +30,11 @@
/**
* Refinement of InputFormat requiring implementors to provide
* ComposableRecordReader instead of RecordReader.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.ComposableInputFormat} instead
*/
+@Deprecated
public interface ComposableInputFormat<K extends WritableComparable,
V extends Writable>
extends InputFormat<K,V> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java Mon Aug 24 06:19:21 2009
@@ -26,7 +26,10 @@
/**
* Additional operations required of a RecordReader to participate in a join.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.ComposableRecordReader} instead
*/
+@Deprecated
public interface ComposableRecordReader<K extends WritableComparable,
V extends Writable>
extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Mon Aug 24 06:19:21 2009
@@ -44,7 +44,10 @@
* in the join.
* @see JoinRecordReader
* @see MultiFilterRecordReader
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat} instead
*/
+@Deprecated
public class CompositeInputFormat<K extends WritableComparable>
implements ComposableInputFormat<K,TupleWritable> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java Mon Aug 24 06:19:21 2009
@@ -31,7 +31,11 @@
/**
* This InputSplit contains a set of child InputSplits. Any InputSplit inserted
* into this collection must have a public default constructor.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit} instead
*/
+@Deprecated
public class CompositeInputSplit implements InputSplit {
private int fill = 0;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Mon Aug 24 06:19:21 2009
@@ -34,7 +34,11 @@
/**
* A RecordReader that can effect joins of RecordReaders sharing a common key
* type and partitioning.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeRecordReader} instead
*/
+@Deprecated
public abstract class CompositeRecordReader<
K extends WritableComparable, // key type
V extends Writable, // accepts RecordReader<K,V> as children
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -26,7 +26,11 @@
/**
* Full inner join.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.InnerJoinRecordReader} instead.
*/
+@Deprecated
public class InnerJoinRecordReader<K extends WritableComparable>
extends JoinRecordReader<K> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -29,7 +29,10 @@
/**
* Base class for Composite joins returning Tuples of arbitrary Writables.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.JoinRecordReader} instead
*/
+@Deprecated
public abstract class JoinRecordReader<K extends WritableComparable>
extends CompositeRecordReader<K,Writable,TupleWritable>
implements ComposableRecordReader<K,TupleWritable> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java Mon Aug 24 06:19:21 2009
@@ -32,7 +32,10 @@
/**
* Base class for Composite join returning values derived from multiple
* sources, but generally not tuples.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.MultiFilterRecordReader} instead
*/
+@Deprecated
public abstract class MultiFilterRecordReader<K extends WritableComparable,
V extends Writable>
extends CompositeRecordReader<K,V,V>
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -26,7 +26,11 @@
/**
* Full outer join.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.OuterJoinRecordReader} instead
*/
+@Deprecated
public class OuterJoinRecordReader<K extends WritableComparable>
extends JoinRecordReader<K> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java Mon Aug 24 06:19:21 2009
@@ -32,7 +32,10 @@
* For example, <tt>override(S1,S2,S3)</tt> will prefer values
* from S3 over S2, and values from S2 over S1 for all keys
* emitted from all sources.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.OverrideRecordReader} instead
*/
+@Deprecated
public class OverrideRecordReader<K extends WritableComparable,
V extends Writable>
extends MultiFilterRecordReader<K,V> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java Mon Aug 24 06:19:21 2009
@@ -59,7 +59,9 @@
* straightforward. One need only override the relevant method(s) (usually only
* {@link CompositeRecordReader#combine}) and include a property to map its
* value to an identifier in the parser.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.join.Parser} instead
*/
+@Deprecated
public class Parser {
public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java Mon Aug 24 06:19:21 2009
@@ -17,77 +17,22 @@
*/
package org.apache.hadoop.mapred.join;
-import java.io.IOException;
-
import org.apache.hadoop.io.Writable;
/**
* This defines an interface to a stateful Iterator that can replay elements
* added to it directly.
* Note that this does not extend {@link java.util.Iterator}.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.ResetableIterator} instead
*/
-public interface ResetableIterator<T extends Writable> {
+@Deprecated
+public interface ResetableIterator<T extends Writable>
+ extends org.apache.hadoop.mapreduce.lib.join.ResetableIterator<T> {
public static class EMPTY<U extends Writable>
- implements ResetableIterator<U> {
- public boolean hasNext() { return false; }
- public void reset() { }
- public void close() throws IOException { }
- public void clear() { }
- public boolean next(U val) throws IOException {
- return false;
- }
- public boolean replay(U val) throws IOException {
- return false;
- }
- public void add(U item) throws IOException {
- throw new UnsupportedOperationException();
- }
+ extends org.apache.hadoop.mapreduce.lib.join.ResetableIterator.EMPTY<U>
+ implements ResetableIterator<U> {
}
-
- /**
- * True if a call to next may return a value. This is permitted false
- * positives, but not false negatives.
- */
- public boolean hasNext();
-
- /**
- * Assign next value to actual.
- * It is required that elements added to a ResetableIterator be returned in
- * the same order after a call to {@link #reset} (FIFO).
- *
- * Note that a call to this may fail for nested joins (i.e. more elements
- * available, but none satisfying the constraints of the join)
- */
- public boolean next(T val) throws IOException;
-
- /**
- * Assign last value returned to actual.
- */
- public boolean replay(T val) throws IOException;
-
- /**
- * Set iterator to return to the start of its range. Must be called after
- * calling {@link #add} to avoid a ConcurrentModificationException.
- */
- public void reset();
-
- /**
- * Add an element to the collection of elements to iterate over.
- */
- public void add(T item) throws IOException;
-
- /**
- * Close datasources and release resources. Calling methods on the iterator
- * after calling close has undefined behavior.
- */
- // XXX is this necessary?
- public void close() throws IOException;
-
- /**
- * Close datasources, but do not release internal resources. Calling this
- * method should permit the object to be reused with a different datasource.
- */
- public void clear();
-
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java Mon Aug 24 06:19:21 2009
@@ -17,83 +17,17 @@
*/
package org.apache.hadoop.mapred.join;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
import org.apache.hadoop.io.Writable;
/**
* This class provides an implementation of ResetableIterator. This
* implementation uses a byte array to store elements added to it.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.StreamBackedIterator} instead
*/
+@Deprecated
public class StreamBackedIterator<X extends Writable>
+ extends org.apache.hadoop.mapreduce.lib.join.StreamBackedIterator<X>
implements ResetableIterator<X> {
-
- private static class ReplayableByteInputStream extends ByteArrayInputStream {
- public ReplayableByteInputStream(byte[] arr) {
- super(arr);
- }
- public void resetStream() {
- mark = 0;
- reset();
- }
- }
-
- private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
- private DataOutputStream outfbuf = new DataOutputStream(outbuf);
- private ReplayableByteInputStream inbuf;
- private DataInputStream infbuf;
-
- public StreamBackedIterator() { }
-
- public boolean hasNext() {
- return infbuf != null && inbuf.available() > 0;
- }
-
- public boolean next(X val) throws IOException {
- if (hasNext()) {
- inbuf.mark(0);
- val.readFields(infbuf);
- return true;
- }
- return false;
- }
-
- public boolean replay(X val) throws IOException {
- inbuf.reset();
- if (0 == inbuf.available())
- return false;
- val.readFields(infbuf);
- return true;
- }
-
- public void reset() {
- if (null != outfbuf) {
- inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
- infbuf = new DataInputStream(inbuf);
- outfbuf = null;
- }
- inbuf.resetStream();
- }
-
- public void add(X item) throws IOException {
- item.write(outfbuf);
- }
-
- public void close() throws IOException {
- if (null != infbuf)
- infbuf.close();
- if (null != outfbuf)
- outfbuf.close();
- }
-
- public void clear() {
- if (null != inbuf)
- inbuf.resetStream();
- outbuf.reset();
- outfbuf = new DataOutputStream(outbuf);
- }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java Mon Aug 24 06:19:21 2009
@@ -18,16 +18,7 @@
package org.apache.hadoop.mapred.join;
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
/**
* Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
@@ -40,17 +31,19 @@
* incompatible with, but contrary to the general case.
*
* @see org.apache.hadoop.io.Writable
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.TupleWritable} instead
*/
-public class TupleWritable implements Writable, Iterable<Writable> {
-
- private BitSet written;
- private Writable[] values;
+@Deprecated
+public class TupleWritable
+ extends org.apache.hadoop.mapreduce.lib.join.TupleWritable {
/**
* Create an empty tuple with no allocated storage for writables.
*/
public TupleWritable() {
- written = new BitSet(0);
+ super();
}
/**
@@ -58,148 +51,7 @@
* "written" values.
*/
public TupleWritable(Writable[] vals) {
- written = new BitSet(vals.length);
- values = vals;
- }
-
- /**
- * Return true if tuple has an element at the position provided.
- */
- public boolean has(int i) {
- return written.get(i);
- }
-
- /**
- * Get ith Writable from Tuple.
- */
- public Writable get(int i) {
- return values[i];
- }
-
- /**
- * The number of children in this Tuple.
- */
- public int size() {
- return values.length;
- }
-
- /**
- * {@inheritDoc}
- */
- public boolean equals(Object other) {
- if (other instanceof TupleWritable) {
- TupleWritable that = (TupleWritable)other;
- if (!this.written.equals(that.written)) {
- return false;
- }
- for (int i = 0; i < values.length; ++i) {
- if (!has(i)) continue;
- if (!values[i].equals(that.get(i))) {
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- public int hashCode() {
- assert false : "hashCode not designed";
- return written.hashCode();
- }
-
- /**
- * Return an iterator over the elements in this tuple.
- * Note that this doesn't flatten the tuple; one may receive tuples
- * from this iterator.
- */
- public Iterator<Writable> iterator() {
- final TupleWritable t = this;
- return new Iterator<Writable>() {
- int bitIndex = written.nextSetBit(0);
- public boolean hasNext() {
- return bitIndex >= 0;
- }
- public Writable next() {
- int returnIndex = bitIndex;
- if (returnIndex < 0)
- throw new NoSuchElementException();
- bitIndex = written.nextSetBit(bitIndex+1);
- return t.get(returnIndex);
- }
- public void remove() {
- if (!written.get(bitIndex)) {
- throw new IllegalStateException("Attempt to remove non-existent val");
- }
- written.clear(bitIndex);
- }
- };
- }
-
- /**
- * Convert Tuple to String as in the following.
- * <tt>[<child1>,<child2>,...,<childn>]</tt>
- */
- public String toString() {
- StringBuffer buf = new StringBuffer("[");
- for (int i = 0; i < values.length; ++i) {
- buf.append(has(i) ? values[i].toString() : "");
- buf.append(",");
- }
- if (values.length != 0)
- buf.setCharAt(buf.length() - 1, ']');
- else
- buf.append(']');
- return buf.toString();
- }
-
- // Writable
-
- /** Writes each Writable to <code>out</code>.
- * TupleWritable format:
- * {@code
- * <count><type1><type2>...<typen><obj1><obj2>...<objn>
- * }
- */
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, values.length);
- writeBitSet(out, values.length, written);
- for (int i = 0; i < values.length; ++i) {
- Text.writeString(out, values[i].getClass().getName());
- }
- for (int i = 0; i < values.length; ++i) {
- if (has(i)) {
- values[i].write(out);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @SuppressWarnings("unchecked") // No static typeinfo on Tuples
- public void readFields(DataInput in) throws IOException {
- int card = WritableUtils.readVInt(in);
- values = new Writable[card];
- readBitSet(in, card, written);
- Class<? extends Writable>[] cls = new Class[card];
- try {
- for (int i = 0; i < card; ++i) {
- cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
- }
- for (int i = 0; i < card; ++i) {
- values[i] = cls[i].newInstance();
- if (has(i)) {
- values[i].readFields(in);
- }
- }
- } catch (ClassNotFoundException e) {
- throw (IOException)new IOException("Failed tuple init").initCause(e);
- } catch (IllegalAccessException e) {
- throw (IOException)new IOException("Failed tuple init").initCause(e);
- } catch (InstantiationException e) {
- throw (IOException)new IOException("Failed tuple init").initCause(e);
- }
+ super(vals);
}
/**
@@ -225,64 +77,5 @@
written.clear();
}
- /**
- * Writes the bit set to the stream. The first 64 bit-positions of the bit set
- * are written as a VLong for backwards-compatibility with older versions of
- * TupleWritable. All bit-positions >= 64 are encoded as a byte for every 8
- * bit-positions.
- */
- private static final void writeBitSet(DataOutput stream, int nbits, BitSet bitSet)
- throws IOException {
- long bits = 0L;
-
- int bitSetIndex = bitSet.nextSetBit(0);
- for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
- bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
- bits |= 1L << bitSetIndex;
- }
- WritableUtils.writeVLong(stream,bits);
-
- if (nbits > Long.SIZE) {
- bits = 0L;
- for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits;
- bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
- int bitsIndex = bitSetIndex % Byte.SIZE;
- int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
- if (word > lastWordWritten) {
- stream.writeByte((byte)bits);
- bits = 0L;
- for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
- stream.writeByte((byte)bits);
- }
- }
- bits |= 1L << bitsIndex;
- }
- stream.writeByte((byte)bits);
- }
- }
- /**
- * Reads a bitset from the stream that has been written with
- * {@link #writeBitSet(DataOutput, int, BitSet)}.
- */
- private static final void readBitSet(DataInput stream, int nbits,
- BitSet bitSet) throws IOException {
- bitSet.clear();
- long initialBits = WritableUtils.readVLong(stream);
- long last = 0L;
- while (0L != initialBits) {
- last = Long.lowestOneBit(initialBits);
- initialBits ^= last;
- bitSet.set(Long.numberOfTrailingZeros(last));
- }
-
- for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
- byte bits = stream.readByte();
- while (0 != bits) {
- last = Long.lowestOneBit(bits);
- bits ^= last;
- bitSet.set(Long.numberOfTrailingZeros(last) + offset);
- }
- }
- }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java Mon Aug 24 06:19:21 2009
@@ -31,7 +31,10 @@
* This class keeps track of the "head" key-value pair for the
* provided RecordReader and keeps a store of values matching a key when
* this source is participating in a join.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.join.WrappedRecordReader} instead
*/
+@Deprecated
public class WrappedRecordReader<K extends WritableComparable,
U extends Writable>
implements ComposableRecordReader<K,U> {
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ArrayListBackedIterator.java Mon Aug 24 06:19:21 2009
@@ -15,14 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* This class provides an implementation of ResetableIterator. The
@@ -36,6 +38,7 @@
private Iterator<X> iter;
private ArrayList<X> data;
private X hold = null;
+ private Configuration conf = new Configuration();
public ArrayListBackedIterator() {
this(new ArrayList<X>());
@@ -52,11 +55,11 @@
public boolean next(X val) throws IOException {
if (iter.hasNext()) {
- WritableUtils.cloneInto(val, iter.next());
+ ReflectionUtils.copy(conf, iter.next(), val);
if (null == hold) {
hold = WritableUtils.clone(val, null);
} else {
- WritableUtils.cloneInto(hold, val);
+ ReflectionUtils.copy(conf, val, hold);
}
return true;
}
@@ -64,7 +67,7 @@
}
public boolean replay(X val) throws IOException {
- WritableUtils.cloneInto(val, hold);
+ ReflectionUtils.copy(conf, hold, val);
return true;
}
@@ -85,5 +88,4 @@
data.clear();
reset();
}
-
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableInputFormat.java Mon Aug 24 06:19:21 2009
@@ -16,25 +16,26 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Refinement of InputFormat requiring implementors to provide
* ComposableRecordReader instead of RecordReader.
*/
-public interface ComposableInputFormat<K extends WritableComparable,
- V extends Writable>
+public abstract class ComposableInputFormat<K extends WritableComparable<?>,
+ V extends Writable>
extends InputFormat<K,V> {
- ComposableRecordReader<K,V> getRecordReader(InputSplit split,
- JobConf job, Reporter reporter) throws IOException;
+ public abstract ComposableRecordReader<K,V> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException;
+
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ComposableRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,50 +16,63 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.RecordReader;
/**
* Additional operations required of a RecordReader to participate in a join.
*/
-public interface ComposableRecordReader<K extends WritableComparable,
- V extends Writable>
- extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {
+public abstract class ComposableRecordReader<K extends WritableComparable<?>,
+ V extends Writable>
+ extends RecordReader<K,V>
+ implements Comparable<ComposableRecordReader<K,?>> {
/**
* Return the position in the collector this class occupies.
*/
- int id();
+ abstract int id();
/**
* Return the key this RecordReader would supply on a call to next(K,V)
*/
- K key();
+ abstract K key();
/**
* Clone the key at the head of this RecordReader into the object provided.
*/
- void key(K key) throws IOException;
+ abstract void key(K key) throws IOException;
/**
+ * Create instance of key.
+ */
+ abstract K createKey();
+
+ /**
+ * Create instance of value.
+ */
+ abstract V createValue();
+
+ /**
* Returns true if the stream is not empty, but provides no guarantee that
* a call to next(K,V) will succeed.
*/
- boolean hasNext();
+ abstract boolean hasNext();
/**
* Skip key-value pairs with keys less than or equal to the key provided.
*/
- void skip(K key) throws IOException;
+ abstract void skip(K key) throws IOException, InterruptedException;
/**
* While key-value pairs from this RecordReader match the given key, register
* them with the JoinCollector provided.
*/
- void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException;
+ @SuppressWarnings("unchecked")
+ abstract void accept(CompositeRecordReader.JoinCollector jc, K key)
+ throws IOException, InterruptedException;
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java Mon Aug 24 06:19:21 2009
@@ -16,20 +16,23 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* An InputFormat capable of performing joins over a set of data sources sorted
@@ -37,17 +40,21 @@
* @see #setFormat
*
* A user may define new join types by setting the property
- * <tt>mapred.join.define.<ident></tt> to a classname. In the expression
- * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
- * ComposableRecordReader.
- * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
- * in the join.
+ * <tt>mapreduce.join.define.<ident></tt> to a classname.
+ * In the expression <tt>mapreduce.join.expr</tt>, the identifier will be
+ * assumed to be a ComposableRecordReader.
+ * <tt>mapreduce.join.keycomparator</tt> can be a classname used to compare
+ * keys in the join.
* @see JoinRecordReader
* @see MultiFilterRecordReader
*/
+@SuppressWarnings("unchecked")
public class CompositeInputFormat<K extends WritableComparable>
- implements ComposableInputFormat<K,TupleWritable> {
+ extends InputFormat<K, TupleWritable> {
+ public static final String JOIN_EXPR = "mapreduce.join.expr";
+ public static final String JOIN_COMPARATOR = "mapreduce.join.keycomparator";
+
// expression parse tree to which IF requests are proxied
private Parser.Node root;
@@ -62,16 +69,16 @@
* class ::= @see java.lang.Class#forName(java.lang.String)
* path ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
* }
- * Reads expression from the <tt>mapred.join.expr</tt> property and
- * user-supplied join types from <tt>mapred.join.define.<ident></tt>
+ * Reads expression from the <tt>mapreduce.join.expr</tt> property and
+ * user-supplied join types from <tt>mapreduce.join.define.<ident></tt>
* types. Paths supplied to <tt>tbl</tt> are given as input paths to the
* InputFormat class listed.
* @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
*/
- public void setFormat(JobConf job) throws IOException {
+ public void setFormat(Configuration conf) throws IOException {
addDefaults();
- addUserIdentifiers(job);
- root = Parser.parse(job.get("mapred.join.expr", null), job);
+ addUserIdentifiers(conf);
+ root = Parser.parse(conf.get(JOIN_EXPR, null), conf);
}
/**
@@ -91,17 +98,16 @@
/**
* Inform the parser of user-defined types.
*/
- private void addUserIdentifiers(JobConf job) throws IOException {
- Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$");
- for (Map.Entry<String,String> kv : job) {
+ private void addUserIdentifiers(Configuration conf) throws IOException {
+ Pattern x = Pattern.compile("^mapreduce\\.join\\.define\\.(\\w+)$");
+ for (Map.Entry<String,String> kv : conf) {
Matcher m = x.matcher(kv.getKey());
if (m.matches()) {
try {
Parser.CNode.addIdentifier(m.group(1),
- job.getClass(m.group(0), null, ComposableRecordReader.class));
+ conf.getClass(m.group(0), null, ComposableRecordReader.class));
} catch (NoSuchMethodException e) {
- throw (IOException)new IOException(
- "Invalid define for " + m.group(1)).initCause(e);
+ throw new IOException("Invalid define for " + m.group(1), e);
}
}
}
@@ -111,10 +117,12 @@
* Build a CompositeInputSplit from the child InputFormats by assigning the
* ith split from each child to the ith composite split.
*/
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- setFormat(job);
- job.setLong("mapred.min.split.size", Long.MAX_VALUE);
- return root.getSplits(job, numSplits);
+ @SuppressWarnings("unchecked")
+ public List<InputSplit> getSplits(JobContext job)
+ throws IOException, InterruptedException {
+ setFormat(job.getConfiguration());
+ job.getConfiguration().setLong("mapred.min.split.size", Long.MAX_VALUE);
+ return root.getSplits(job);
}
/**
@@ -124,10 +132,11 @@
* Mandating TupleWritable isn't strictly correct.
*/
@SuppressWarnings("unchecked") // child types unknown
- public ComposableRecordReader<K,TupleWritable> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter) throws IOException {
- setFormat(job);
- return root.getRecordReader(split, job, reporter);
+ public RecordReader<K,TupleWritable> createRecordReader(InputSplit split,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ setFormat(taskContext.getConfiguration());
+ return root.createRecordReader(split, taskContext);
}
/**
@@ -135,8 +144,10 @@
* Given InputFormat class (inf), path (p) return:
* {@code tbl(<inf>, <p>) }
*/
- public static String compose(Class<? extends InputFormat> inf, String path) {
- return compose(inf.getName().intern(), path, new StringBuffer()).toString();
+ public static String compose(Class<? extends InputFormat> inf,
+ String path) {
+ return compose(inf.getName().intern(), path,
+ new StringBuffer()).toString();
}
/**
@@ -144,8 +155,8 @@
* Given operation (op), Object class (inf), set of paths (p) return:
* {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
*/
- public static String compose(String op, Class<? extends InputFormat> inf,
- String... path) {
+ public static String compose(String op,
+ Class<? extends InputFormat> inf, String... path) {
final String infname = inf.getName();
StringBuffer ret = new StringBuffer(op + '(');
for (String p : path) {
@@ -161,8 +172,8 @@
* Given operation (op), Object class (inf), set of paths (p) return:
* {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
*/
- public static String compose(String op, Class<? extends InputFormat> inf,
- Path... path) {
+ public static String compose(String op,
+ Class<? extends InputFormat> inf, Path... path) {
ArrayList<String> tmp = new ArrayList<String>(path.length);
for (Path p : path) {
tmp.add(p.toString());
@@ -177,5 +188,4 @@
sb.append("\")");
return sb;
}
-
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java Mon Aug 24 06:19:21 2009
@@ -16,27 +16,33 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashSet;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.util.ReflectionUtils;
/**
* This InputSplit contains a set of child InputSplits. Any InputSplit inserted
* into this collection must have a public default constructor.
*/
-public class CompositeInputSplit implements InputSplit {
+public class CompositeInputSplit extends InputSplit implements Writable {
private int fill = 0;
private long totsize = 0L;
private InputSplit[] splits;
+ private Configuration conf = new Configuration();
public CompositeInputSplit() { }
@@ -49,7 +55,7 @@
* @throws IOException If capacity was not specified during construction
* or if capacity has been reached.
*/
- public void add(InputSplit s) throws IOException {
+ public void add(InputSplit s) throws IOException, InterruptedException {
if (null == splits) {
throw new IOException("Uninitialized InputSplit");
}
@@ -77,14 +83,14 @@
/**
* Get the length of ith child InputSplit.
*/
- public long getLength(int i) throws IOException {
+ public long getLength(int i) throws IOException, InterruptedException {
return splits[i].getLength();
}
/**
* Collect a set of hosts from all child InputSplits.
*/
- public String[] getLocations() throws IOException {
+ public String[] getLocations() throws IOException, InterruptedException {
HashSet<String> hosts = new HashSet<String>();
for (InputSplit s : splits) {
String[] hints = s.getLocations();
@@ -100,7 +106,7 @@
/**
* getLocations from ith InputSplit.
*/
- public String[] getLocation(int i) throws IOException {
+ public String[] getLocation(int i) throws IOException, InterruptedException {
return splits[i].getLocations();
}
@@ -110,20 +116,26 @@
* <count><class1><class2>...<classn><split1><split2>...<splitn>
* }
*/
+ @SuppressWarnings("unchecked")
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, splits.length);
for (InputSplit s : splits) {
Text.writeString(out, s.getClass().getName());
}
for (InputSplit s : splits) {
- s.write(out);
+ SerializationFactory factory = new SerializationFactory(conf);
+ Serializer serializer =
+ factory.getSerializer(s.getClass());
+ serializer.open((DataOutputStream)out);
+ serializer.serialize(s);
+ serializer.close();
}
}
/**
* {@inheritDoc}
* @throws IOException If the child InputSplit cannot be read, typically
- * for faliing access checks.
+ * for failing access checks.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public void readFields(DataInput in) throws IOException {
@@ -139,11 +151,14 @@
}
for (int i = 0; i < card; ++i) {
splits[i] = ReflectionUtils.newInstance(cls[i], null);
- splits[i].readFields(in);
+ SerializationFactory factory = new SerializationFactory(conf);
+ Deserializer deserializer = factory.getDeserializer(cls[i]);
+ deserializer.open((DataInputStream)in);
+ splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
+ deserializer.close();
}
} catch (ClassNotFoundException e) {
- throw (IOException)new IOException("Failed split init").initCause(e);
+ throw new IOException("Failed split init", e);
}
}
-
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java Mon Aug 24 06:19:21 2009
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import java.util.ArrayList;
@@ -24,11 +24,13 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -36,24 +38,28 @@
* type and partitioning.
*/
public abstract class CompositeRecordReader<
- K extends WritableComparable, // key type
- V extends Writable, // accepts RecordReader<K,V> as children
- X extends Writable> // emits Writables of this type
+ K extends WritableComparable<?>, // key type
+ V extends Writable, // accepts RecordReader<K,V> as children
+ X extends Writable> // emits Writables of this type
+ extends ComposableRecordReader<K, X>
implements Configurable {
-
private int id;
- private Configuration conf;
+ protected Configuration conf;
private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
private WritableComparator cmp;
- private Class<? extends WritableComparable> keyclass;
+ @SuppressWarnings("unchecked")
+ protected Class<? extends WritableComparable> keyclass = null;
private PriorityQueue<ComposableRecordReader<K,?>> q;
protected final JoinCollector jc;
protected final ComposableRecordReader<K,? extends V>[] kids;
protected abstract boolean combine(Object[] srcs, TupleWritable value);
+
+ protected K key;
+ protected X value;
/**
* Create a RecordReader with <tt>capacity</tt> children to position
@@ -70,17 +76,56 @@
if (null != cmpcl) {
cmp = ReflectionUtils.newInstance(cmpcl, null);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
- new Comparator<ComposableRecordReader<K,?>>() {
- public int compare(ComposableRecordReader<K,?> o1,
- ComposableRecordReader<K,?> o2) {
- return cmp.compare(o1.key(), o2.key());
- }
- });
+ new Comparator<ComposableRecordReader<K,?>>() {
+ public int compare(ComposableRecordReader<K,?> o1,
+ ComposableRecordReader<K,?> o2) {
+ return cmp.compare(o1.key(), o2.key());
+ }
+ });
}
jc = new JoinCollector(capacity);
kids = new ComposableRecordReader[capacity];
}
+ @SuppressWarnings("unchecked")
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ if (kids != null) {
+ for (int i = 0; i < kids.length; ++i) {
+ kids[i].initialize(((CompositeInputSplit)split).get(i), context);
+ if (kids[i].key() == null) {
+ continue;
+ }
+
+ // get keyclass
+ if (keyclass == null) {
+ keyclass = kids[i].createKey().getClass().
+ asSubclass(WritableComparable.class);
+ }
+ // create priority queue
+ if (null == q) {
+ cmp = WritableComparator.get(keyclass);
+ q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+ new Comparator<ComposableRecordReader<K,?>>() {
+ public int compare(ComposableRecordReader<K,?> o1,
+ ComposableRecordReader<K,?> o2) {
+ return cmp.compare(o1.key(), o2.key());
+ }
+ });
+ }
+ // Explicit check for key class agreement
+ if (!keyclass.equals(kids[i].key().getClass())) {
+ throw new ClassCastException("Child key classes fail to agree");
+ }
+
+ // add the kid to priority queue if it has any elements
+ if (kids[i].hasNext()) {
+ q.add(kids[i]);
+ }
+ }
+ }
+ }
+
/**
* Return the position in the collector this class occupies.
*/
@@ -123,21 +168,9 @@
* entry will appear. Adding RecordReaders with the same id has
* undefined behavior.
*/
- public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
+ public void add(ComposableRecordReader<K,? extends V> rr)
+ throws IOException, InterruptedException {
kids[rr.id()] = rr;
- if (null == q) {
- cmp = WritableComparator.get(rr.createKey().getClass());
- q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
- new Comparator<ComposableRecordReader<K,?>>() {
- public int compare(ComposableRecordReader<K,?> o1,
- ComposableRecordReader<K,?> o2) {
- return cmp.compare(o1.key(), o2.key());
- }
- });
- }
- if (rr.hasNext()) {
- q.add(rr);
- }
}
/**
@@ -146,7 +179,7 @@
* one or more child RR contain duplicate keys, this will emit the cross
* product of the associated values until exhausted.
*/
- class JoinCollector {
+ public class JoinCollector {
private K key;
private ResetableIterator<X>[] iters;
private int pos = -1;
@@ -208,7 +241,7 @@
/**
* Returns false if exhausted or if reset(K) has not been called.
*/
- protected boolean hasNext() {
+ public boolean hasNext() {
return !(pos < 0);
}
@@ -218,7 +251,7 @@
* sources s_1...s_n sharing key k, repeated calls to next should yield
* I x I.
*/
- @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+ @SuppressWarnings("unchecked") // No static type info on Tuples
protected boolean next(TupleWritable val) throws IOException {
if (first) {
int i = -1;
@@ -321,9 +354,13 @@
* Clone the key at the top of this RR into the given object.
*/
public void key(K key) throws IOException {
- WritableUtils.cloneInto(key, key());
+ ReflectionUtils.copy(conf, key(), key);
}
+ public K getCurrentKey() {
+ return key;
+ }
+
/**
* Return true if it is possible that this could emit more values.
*/
@@ -334,7 +371,7 @@
/**
* Pass skip key to child RRs.
*/
- public void skip(K key) throws IOException {
+ public void skip(K key) throws IOException, InterruptedException {
ArrayList<ComposableRecordReader<K,?>> tmp =
new ArrayList<ComposableRecordReader<K,?>>();
while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
@@ -359,8 +396,9 @@
* iterator over values it may emit.
*/
@SuppressWarnings("unchecked") // No values from static EMPTY class
+ @Override
public void accept(CompositeRecordReader.JoinCollector jc, K key)
- throws IOException {
+ throws IOException, InterruptedException {
if (hasNext() && 0 == cmp.compare(key, key())) {
fillJoinCollector(createKey());
jc.add(id, getDelegate());
@@ -373,7 +411,8 @@
* For all child RRs offering the key provided, obtain an iterator
* at that position in the JoinCollector.
*/
- protected void fillJoinCollector(K iterkey) throws IOException {
+ protected void fillJoinCollector(K iterkey)
+ throws IOException, InterruptedException {
if (!q.isEmpty()) {
q.peek().key(iterkey);
while (0 == cmp.compare(q.peek().key(), iterkey)) {
@@ -397,19 +436,13 @@
}
/**
- * Create a new key value common to all child RRs.
+ * Create a new key common to all child RRs.
* @throws ClassCastException if key classes differ.
*/
- @SuppressWarnings("unchecked") // Explicit check for key class agreement
- public K createKey() {
- if (null == keyclass) {
- final Class<?> cls = kids[0].createKey().getClass();
- for (RecordReader<K,? extends Writable> rr : kids) {
- if (!cls.equals(rr.createKey().getClass())) {
- throw new ClassCastException("Child key classes fail to agree");
- }
- }
- keyclass = cls.asSubclass(WritableComparable.class);
+ @SuppressWarnings("unchecked")
+ protected K createKey() {
+ if (keyclass == null || keyclass.equals(NullWritable.class)) {
+ return (K) NullWritable.get();
}
return (K) ReflectionUtils.newInstance(keyclass, getConf());
}
@@ -417,7 +450,7 @@
/**
* Create a value to be used internally for joins.
*/
- protected TupleWritable createInternalValue() {
+ protected TupleWritable createTupleWritable() {
Writable[] vals = new Writable[kids.length];
for (int i = 0; i < vals.length; ++i) {
vals[i] = kids[i].createValue();
@@ -425,11 +458,10 @@
return new TupleWritable(vals);
}
- /**
- * Unsupported (returns zero in all cases).
- */
- public long getPos() throws IOException {
- return 0;
+ /** {@inheritDoc} */
+ public X getCurrentValue()
+ throws IOException, InterruptedException {
+ return value;
}
/**
@@ -449,11 +481,12 @@
/**
* Report progress as the minimum of all child RR progress.
*/
- public float getProgress() throws IOException {
+ public float getProgress() throws IOException, InterruptedException {
float ret = 1.0f;
for (RecordReader<K,? extends Writable> rr : kids) {
ret = Math.min(ret, rr.getProgress());
}
return ret;
}
+
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/InnerJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,21 +16,21 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
/**
* Full inner join.
*/
-public class InnerJoinRecordReader<K extends WritableComparable>
+public class InnerJoinRecordReader<K extends WritableComparable<?>>
extends JoinRecordReader<K> {
- InnerJoinRecordReader(int id, JobConf conf, int capacity,
+ InnerJoinRecordReader(int id, Configuration conf, int capacity,
Class<? extends WritableComparator> cmpcl) throws IOException {
super(id, conf, capacity, cmpcl);
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/JoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,25 +16,24 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import java.util.PriorityQueue;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Base class for Composite joins returning Tuples of arbitrary Writables.
*/
-public abstract class JoinRecordReader<K extends WritableComparable>
- extends CompositeRecordReader<K,Writable,TupleWritable>
- implements ComposableRecordReader<K,TupleWritable> {
+public abstract class JoinRecordReader<K extends WritableComparable<?>>
+ extends CompositeRecordReader<K,Writable,TupleWritable> {
- public JoinRecordReader(int id, JobConf conf, int capacity,
+ public JoinRecordReader(int id, Configuration conf, int capacity,
Class<? extends WritableComparator> cmpcl) throws IOException {
super(id, capacity, cmpcl);
setConf(conf);
@@ -44,19 +43,27 @@
* Emit the next set of key, value pairs as defined by the child
* RecordReaders and operation associated with this composite RR.
*/
- public boolean next(K key, TupleWritable value) throws IOException {
+ public boolean nextKeyValue()
+ throws IOException, InterruptedException {
+ if (key == null) {
+ key = createKey();
+ }
if (jc.flush(value)) {
- WritableUtils.cloneInto(key, jc.key());
+ ReflectionUtils.copy(conf, jc.key(), key);
return true;
}
jc.clear();
+ if (value == null) {
+ value = createValue();
+ }
+ final PriorityQueue<ComposableRecordReader<K,?>> q =
+ getRecordReaderQueue();
K iterkey = createKey();
- final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
- while (!q.isEmpty()) {
+ while (q != null && !q.isEmpty()) {
fillJoinCollector(iterkey);
jc.reset(iterkey);
if (jc.flush(value)) {
- WritableUtils.cloneInto(key, jc.key());
+ ReflectionUtils.copy(conf, jc.key(), key);
return true;
}
jc.clear();
@@ -64,9 +71,8 @@
return false;
}
- /** {@inheritDoc} */
public TupleWritable createValue() {
- return createInternalValue();
+ return createTupleWritable();
}
/**
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/MultiFilterRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,32 +16,30 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import java.util.PriorityQueue;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
/**
* Base class for Composite join returning values derived from multiple
* sources, but generally not tuples.
*/
-public abstract class MultiFilterRecordReader<K extends WritableComparable,
+public abstract class MultiFilterRecordReader<K extends WritableComparable<?>,
V extends Writable>
- extends CompositeRecordReader<K,V,V>
- implements ComposableRecordReader<K,V> {
+ extends CompositeRecordReader<K,V,V> {
- private Class<? extends Writable> valueclass;
- private TupleWritable ivalue;
+ private TupleWritable ivalue = null;
- public MultiFilterRecordReader(int id, JobConf conf, int capacity,
+ public MultiFilterRecordReader(int id, Configuration conf, int capacity,
Class<? extends WritableComparator> cmpcl) throws IOException {
super(id, capacity, cmpcl);
setConf(conf);
@@ -65,21 +63,31 @@
}
/** {@inheritDoc} */
- public boolean next(K key, V value) throws IOException {
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null) {
+ key = createKey();
+ }
+ if (value == null) {
+ value = createValue();
+ }
if (jc.flush(ivalue)) {
- WritableUtils.cloneInto(key, jc.key());
- WritableUtils.cloneInto(value, emit(ivalue));
+ ReflectionUtils.copy(conf, jc.key(), key);
+ ReflectionUtils.copy(conf, emit(ivalue), value);
return true;
}
+ if (ivalue == null) {
+ ivalue = createTupleWritable();
+ }
jc.clear();
+ final PriorityQueue<ComposableRecordReader<K,?>> q =
+ getRecordReaderQueue();
K iterkey = createKey();
- final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
- while (!q.isEmpty()) {
+ while (q != null && !q.isEmpty()) {
fillJoinCollector(iterkey);
jc.reset(iterkey);
if (jc.flush(ivalue)) {
- WritableUtils.cloneInto(key, jc.key());
- WritableUtils.cloneInto(value, emit(ivalue));
+ ReflectionUtils.copy(conf, jc.key(), key);
+ ReflectionUtils.copy(conf, emit(ivalue), value);
return true;
}
jc.clear();
@@ -87,20 +95,10 @@
return false;
}
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked") // Explicit check for value class agreement
- public V createValue() {
- if (null == valueclass) {
- final Class<?> cls = kids[0].createValue().getClass();
- for (RecordReader<K,? extends V> rr : kids) {
- if (!cls.equals(rr.createValue().getClass())) {
- throw new ClassCastException("Child value classes fail to agree");
- }
- }
- valueclass = cls.asSubclass(Writable.class);
- ivalue = createInternalValue();
- }
- return (V) ReflectionUtils.newInstance(valueclass, null);
+ @SuppressWarnings("unchecked")
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ super.initialize(split, context);
}
/**
@@ -124,13 +122,13 @@
public boolean next(V val) throws IOException {
boolean ret;
if (ret = jc.flush(ivalue)) {
- WritableUtils.cloneInto(val, emit(ivalue));
+ ReflectionUtils.copy(getConf(), emit(ivalue), val);
}
return ret;
}
public boolean replay(V val) throws IOException {
- WritableUtils.cloneInto(val, emit(ivalue));
+ ReflectionUtils.copy(getConf(), emit(ivalue), val);
return true;
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OuterJoinRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,21 +16,21 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
/**
* Full outer join.
*/
-public class OuterJoinRecordReader<K extends WritableComparable>
+public class OuterJoinRecordReader<K extends WritableComparable<?>>
extends JoinRecordReader<K> {
- OuterJoinRecordReader(int id, JobConf conf, int capacity,
+ OuterJoinRecordReader(int id, Configuration conf, int capacity,
Class<? extends WritableComparator> cmpcl) throws IOException {
super(id, conf, capacity, cmpcl);
}
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/OverrideRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,16 +16,18 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.PriorityQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Prefer the "rightmost" data source for this key.
@@ -33,14 +35,15 @@
* from S3 over S2, and values from S2 over S1 for all keys
* emitted from all sources.
*/
-public class OverrideRecordReader<K extends WritableComparable,
+public class OverrideRecordReader<K extends WritableComparable<?>,
V extends Writable>
extends MultiFilterRecordReader<K,V> {
- OverrideRecordReader(int id, JobConf conf, int capacity,
+ OverrideRecordReader(int id, Configuration conf, int capacity,
Class<? extends WritableComparator> cmpcl) throws IOException {
super(id, conf, capacity, cmpcl);
}
+ private Class<? extends Writable> valueclass = null;
/**
* Emit the value with the highest position in the tuple.
@@ -50,6 +53,21 @@
return (V) dst.iterator().next();
}
+ @SuppressWarnings("unchecked") // Explicit check for value class agreement
+ public V createValue() {
+ if (null == valueclass) {
+ Class<?> cls = kids[kids.length -1].createValue().getClass();
+ for (int i = kids.length -1; cls.equals(NullWritable.class); i--) {
+ cls = kids[i].createValue().getClass();
+ }
+ valueclass = cls.asSubclass(Writable.class);
+ }
+ if (valueclass.equals(NullWritable.class)) {
+ return (V) NullWritable.get();
+ }
+ return (V) ReflectionUtils.newInstance(valueclass, null);
+ }
+
/**
* Instead of filling the JoinCollector with iterators from all
* data sources, fill only the rightmost for this key.
@@ -59,9 +77,11 @@
* n is the cardinality of the cross product of the discarded
* streams for the given key.
*/
- protected void fillJoinCollector(K iterkey) throws IOException {
- final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
- if (!q.isEmpty()) {
+ protected void fillJoinCollector(K iterkey)
+ throws IOException, InterruptedException {
+ final PriorityQueue<ComposableRecordReader<K,?>> q =
+ getRecordReaderQueue();
+ if (q != null && !q.isEmpty()) {
int highpos = -1;
ArrayList<ComposableRecordReader<K,?>> list =
new ArrayList<ComposableRecordReader<K,?>>(kids.length);