You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/12/07 23:01:35 UTC
svn commit: r602240 [2/2] - in /lucene/hadoop/trunk: ./
src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/mapred/join/
src/test/org/apache/hadoop/mapred/join/
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class provides an implementation of ResetableIterator. This
+ * implementation uses a byte array to store elements added to it.
+ */
+public class StreamBackedIterator<X extends Writable>
+ implements ResetableIterator<X> {
+
+ private static class ReplayableByteInputStream extends ByteArrayInputStream {
+ public ReplayableByteInputStream(byte[] arr) {
+ super(arr);
+ }
+ public void resetStream() {
+ mark = 0;
+ reset();
+ }
+ }
+
+ private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
+ private DataOutputStream outfbuf = new DataOutputStream(outbuf);
+ private ReplayableByteInputStream inbuf;
+ private DataInputStream infbuf;
+
+ public StreamBackedIterator() { }
+
+ public boolean hasNext() {
+ return infbuf != null && inbuf.available() > 0;
+ }
+
+ public boolean next(X val) throws IOException {
+ if (hasNext()) {
+ inbuf.mark(0);
+ val.readFields(infbuf);
+ return true;
+ }
+ return false;
+ }
+
+ public void replay(X val) throws IOException {
+ inbuf.reset();
+ val.readFields(infbuf);
+ }
+
+ public void reset() {
+ if (null != outfbuf) {
+ inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
+ infbuf = new DataInputStream(inbuf);
+ outfbuf = null;
+ }
+ inbuf.resetStream();
+ }
+
+ public void add(X item) throws IOException {
+ item.write(outfbuf);
+ }
+
+ public void close() throws IOException {
+ if (null != infbuf)
+ infbuf.close();
+ if (null != outfbuf)
+ outfbuf.close();
+ }
+
+ public void clear() {
+ if (null != inbuf)
+ inbuf.resetStream();
+ outbuf.reset();
+ outfbuf = new DataOutputStream(outbuf);
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
+ */
+public class TupleWritable implements Writable, Iterable<Writable> {
+
+ private long written;
+ private Writable[] values;
+
+ /**
+ * Create an empty tuple with no allocated storage for writables.
+ */
+ public TupleWritable() { }
+
+ /**
+ * Initialize tuple with storage; unknown whether any of them contain
+ * "written" values.
+ */
+ public TupleWritable(Writable[] vals) {
+ written = 0L;
+ values = vals;
+ }
+
+ /**
+ * Return true if tuple has an element at the position provided.
+ */
+ public boolean has(int i) {
+ return 0 != ((1 << i) & written);
+ }
+
+ /**
+ * Get ith Writable from Tuple.
+ */
+ public Writable get(int i) {
+ return values[i];
+ }
+
+ /**
+ * The number of children in this Tuple.
+ */
+ public int size() {
+ return values.length;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean equals(Object other) {
+ if (other instanceof TupleWritable) {
+ TupleWritable that = (TupleWritable)other;
+ if (this.size() != that.size() || this.mask() != that.mask()) {
+ return false;
+ }
+ for (int i = 0; i < values.length; ++i) {
+ if (!has(i)) continue;
+ if (!values[i].equals(that.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return (int)written;
+ }
+
+ /**
+ * Return an iterator over the elements in this tuple.
+ * Note that this doesn't flatten the tuple; one may receive tuples
+ * from this iterator.
+ */
+ public Iterator<Writable> iterator() {
+ final TupleWritable t = this;
+ return new Iterator<Writable>() {
+ long i = written;
+ long last = 0L;
+ public boolean hasNext() {
+ return 0L != i;
+ }
+ public Writable next() {
+ last = Long.lowestOneBit(i);
+ if (0 == last)
+ throw new NoSuchElementException();
+ i ^= last;
+ // numberOfTrailingZeros rtn 64 if lsb set
+ return t.get(Long.numberOfTrailingZeros(last) % 64);
+ }
+ public void remove() {
+ t.written ^= last;
+ if (t.has(Long.numberOfTrailingZeros(last))) {
+ throw new IllegalStateException("Attempt to remove non-existent val");
+ }
+ }
+ };
+ }
+
+ /**
+ * Convert Tuple to String as in the following.
+ * <tt>[<child1>,<child2>,...,<childn>]</tt>
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer("[");
+ for (int i = 0; i < values.length; ++i) {
+ buf.append(has(i) ? values[i].toString() : "");
+ buf.append(",");
+ }
+ if (values.length != 0)
+ buf.setCharAt(buf.length() - 1, ']');
+ else
+ buf.append(']');
+ return buf.toString();
+ }
+
+ // Writable
+
+ /** Writes each Writable to <code>out</code>.
+ * TupleWritable format:
+ * {@code
+ * <count><type1><type2>...<typen><obj1><obj2>...<objn>
+ * }
+ */
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, values.length);
+ WritableUtils.writeVLong(out, written);
+ for (int i = 0; i < values.length; ++i) {
+ Text.writeString(out, values[i].getClass().getName());
+ }
+ for (int i = 0; i < values.length; ++i) {
+ if (has(i)) {
+ values[i].write(out);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+ public void readFields(DataInput in) throws IOException {
+ int card = WritableUtils.readVInt(in);
+ values = new Writable[card];
+ written = WritableUtils.readVLong(in);
+ Class<? extends Writable>[] cls = new Class[card];
+ try {
+ for (int i = 0; i < card; ++i) {
+ cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
+ }
+ for (int i = 0; i < card; ++i) {
+ values[i] = cls[i].newInstance();
+ if (has(i)) {
+ values[i].readFields(in);
+ }
+ }
+ } catch (ClassNotFoundException e) {
+ throw (IOException)new IOException("Failed tuple init").initCause(e);
+ } catch (IllegalAccessException e) {
+ throw (IOException)new IOException("Failed tuple init").initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException)new IOException("Failed tuple init").initCause(e);
+ }
+ }
+
+ /**
+ * Record that the tuple contains an element at the position provided.
+ */
+ void setWritten(int i) {
+ written |= 1 << i;
+ }
+
+ /**
+ * Record that the tuple does not contain an element at the position
+ * provided.
+ */
+ void clearWritten(int i) {
+ written &= -1 ^ (1 << i);
+ }
+
+ /**
+ * Clear any record of which writables have been written to, without
+ * releasing storage.
+ */
+ void clearWritten() {
+ written = 0L;
+ }
+
+ /**
+ * Return a bitmap recording which of the writables that have been
+ * written to.
+ */
+ long mask() {
+ return written;
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Proxy class for a RecordReader participating in the join framework.
+ * This class keeps track of the "head" key-value pair for the
+ * provided RecordReader and keeps a store of values matching a key when
+ * this source is participating in a join.
+ */
+class WrappedRecordReader<K extends WritableComparable,
+ U extends Writable>
+ implements ComposableRecordReader<K,U> {
+
+ private boolean empty = false;
+ private RecordReader<K,U> rr;
+ private int id; // index at which values will be inserted in collector
+
+ private K khead; // key at the top of this RR
+ private U vhead; // value assoc with khead
+ private WritableComparator cmp;
+
+ private ResetableIterator<U> vjoin;
+
+ /**
+ * For a given RecordReader rr, occupy position id in collector.
+ */
+ WrappedRecordReader(int id, RecordReader<K,U> rr,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ this.id = id;
+ this.rr = rr;
+ khead = rr.createKey();
+ vhead = rr.createValue();
+ try {
+ cmp = (null == cmpcl)
+ ? WritableComparator.get(khead.getClass())
+ : cmpcl.newInstance();
+ } catch (InstantiationException e) {
+ throw (IOException)new IOException().initCause(e);
+ } catch (IllegalAccessException e) {
+ throw (IOException)new IOException().initCause(e);
+ }
+ vjoin = new StreamBackedIterator<U>();
+ next();
+ }
+
+ /** {@inheritDoc} */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * Return the key at the head of this RR.
+ */
+ public K key() {
+ return khead;
+ }
+
+ /**
+ * Clone the key at the head of this RR into the object supplied.
+ */
+ public void key(K qkey) throws IOException {
+ WritableUtils.cloneInto(qkey, khead);
+ }
+
+ /**
+ * Return true if the RR- including the k,v pair stored in this object-
+ * is exhausted.
+ */
+ public boolean hasNext() {
+ return !empty;
+ }
+
+ /**
+ * Skip key-value pairs with keys less than or equal to the key provided.
+ */
+ public void skip(K key) throws IOException {
+ if (hasNext()) {
+ while (cmp.compare(khead, key) <= 0 && next());
+ }
+ }
+
+ /**
+ * Read the next k,v pair into the head of this object; return true iff
+ * the RR and this are exhausted.
+ */
+ protected boolean next() throws IOException {
+ empty = !rr.next(khead, vhead);
+ return hasNext();
+ }
+
+ /**
+ * Add an iterator to the collector at the position occupied by this
+ * RecordReader over the values in this stream paired with the key
+ * provided (ie register a stream of values from this source matching K
+ * with a collector).
+ */
+ // JoinCollector comes from parent, which has
+ @SuppressWarnings("unchecked") // no static type for the slot this sits in
+ public void accept(CompositeRecordReader.JoinCollector i, K key)
+ throws IOException {
+ vjoin.clear();
+ if (0 == cmp.compare(key, khead)) {
+ do {
+ vjoin.add(vhead);
+ } while (next() && 0 == cmp.compare(key, khead));
+ }
+ i.add(id, vjoin);
+ }
+
+ /**
+ * Write key-value pair at the head of this stream to the objects provided;
+ * get next key-value pair from proxied RR.
+ */
+ public boolean next(K key, U value) throws IOException {
+ if (hasNext()) {
+ WritableUtils.cloneInto(key, khead);
+ WritableUtils.cloneInto(value, vhead);
+ next();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Request new key from proxied RR.
+ */
+ public K createKey() {
+ return rr.createKey();
+ }
+
+ /**
+ * Request new value from proxied RR.
+ */
+ public U createValue() {
+ return rr.createValue();
+ }
+
+ /**
+ * Request progress from proxied RR.
+ */
+ public float getProgress() throws IOException {
+ return rr.getProgress();
+ }
+
+ /**
+ * Request position from proxied RR.
+ */
+ public long getPos() throws IOException {
+ return rr.getPos();
+ }
+
+ /**
+ * Forward close request to proxied RR.
+ */
+ public void close() throws IOException {
+ rr.close();
+ }
+
+ /**
+ * Implement Comparable contract (compare key at head of proxied RR
+ * with that of another).
+ */
+ public int compareTo(ComposableRecordReader<K,?> other) {
+ return cmp.compare(key(), other.key());
+ }
+
+ /**
+ * Return true iff compareTo(other) retn true.
+ */
+ @SuppressWarnings("unchecked") // Explicit type check prior to cast
+ public boolean equals(Object other) {
+ return other instanceof ComposableRecordReader
+ && 0 == compareTo((ComposableRecordReader)other);
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42;
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html Fri Dec 7 14:01:32 2007
@@ -0,0 +1,88 @@
+<HTML>
+
+<BODY>
+
+<p>Given a set of sorted datasets keyed with the same class and yielding equal
+partitions, it is possible to effect a join of those datasets prior to the map.
+This could save costs in re-partitioning, sorting, shuffling, and writing out
+data required in the general case.</p>
+
+<h3><a name="Interface"></a>Interface</h3>
+
+<p>The attached code offers the following interface to users of these
+classes.</p>
+
+<table>
+<tr><th>property</th><th>required</th><th>value</th></tr>
+<tr><td>mapred.join.expr</td><td>yes</td>
+ <td>Join expression to effect over input data</td></tr>
+<tr><td>mapred.join.keycomparator</td><td>no</td>
+ <td><tt>WritableComparator</tt> class to use for comparing keys</td></tr>
+<tr><td>mapred.join.define.<ident></td><td>no</td>
+ <td>Class mapped to identifier in join expression</td></tr>
+</table>
+
+<p>The join expression understands the following grammar:</p>
+
+<pre>func ::= <ident>([<func>,]*<func>)
+func ::= tbl(<class>,"<path>");
+
+</pre>
+
+<p>Operations included in this patch are partitioned into one of two types:
+join operations emitting tuples and "multi-filter" operations emitting a
+single value from (but not necessarily included in) a set of input values.
+For a given key, each operation will consider the cross product of all
+values for all sources at that node.</p>
+
+<p>Identifiers supported by default:</p>
+
+<table>
+<tr><th>identifier</th><th>type</th><th>description</th></tr>
+<tr><td>inner</td><td>Join</td><td>Full inner join</td></tr>
+<tr><td>outer</td><td>Join</td><td>Full outer join</td></tr>
+<tr><td>override</td><td>MultiFilter</td>
+ <td>For a given key, prefer values from the rightmost source</td></tr>
+</table>
+
+<p>A user of this class must set the <tt>InputFormat</tt> for the job to
+<tt>CompositeInputFormat</tt> and define a join expression accepted by the
+preceding grammar. For example, both of the following are acceptable:</p>
+
+<pre>inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+ "hdfs://host:8020/foo/bar"),
+ tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+ "hdfs://host:8020/foo/baz"))
+
+outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+ "hdfs://host:8020/foo/bar"),
+ tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+ "hdfs://host:8020/foo/baz")),
+ tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
+ "hdfs://host:8020/foo/rab"))
+</pre>
+
+<p><tt>CompositeInputFormat</tt> includes a handful of convenience methods to
+aid construction of these verbose statements.</p>
+
+<p>As in the second example, joins may be nested. Users may provide a
+comparator class in the <tt>mapred.join.keycomparator</tt> property to specify
+the ordering of their keys, or accept the default comparator as returned by
+<tt>WritableComparator.get(keyclass)</tt>.</p>
+
+<p>Users can specify their own join operations, typically by overriding
+<tt>JoinRecordReader</tt> or <tt>MultiFilterRecordReader</tt> and mapping that
+class to an identifier in the join expression using the
+<tt>mapred.join.define.<em>ident</em></tt> property, where <em>ident</em> is
+the identifier appearing in the join expression. Users may elect to emit- or
+modify- values passing through their join operation. Consulting the existing
+operations for guidance is recommended. Adding arguments is considerably more
+complex (and only partially supported), as one must also add a <tt>Node</tt>
+type to the parse tree. One is probably better off extending
+<tt>RecordReader</tt> in most cases.</p>
+
+<a href="http://issues.apache.org/jira/browse/HADOOP-2085">JIRA</a>
+
+</BODY>
+
+</HTML>
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+public class TestDatamerge extends TestCase {
+
+ private static MiniDFSCluster cluster = null;
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestDatamerge.class)) {
+ protected void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ }
+ protected void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ };
+ return setup;
+ }
+
+ private static SequenceFile.Writer[] createWriters(Path testdir,
+ Configuration conf, int srcs, Path[] src) throws IOException {
+ for (int i = 0; i < srcs; ++i) {
+ src[i] = new Path(testdir, Integer.toString(i + 10, 36));
+ }
+ SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
+ for (int i = 0; i < srcs; ++i) {
+ out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
+ src[i], IntWritable.class, IntWritable.class);
+ }
+ return out;
+ }
+
+ private static Path[] writeSimpleSrc(Path testdir, Configuration conf,
+ int srcs) throws IOException {
+ SequenceFile.Writer out[] = null;
+ Path[] src = new Path[srcs];
+ try {
+ out = createWriters(testdir, conf, srcs, src);
+ final int capacity = srcs * 2 + 1;
+ IntWritable key = new IntWritable();
+ IntWritable val = new IntWritable();
+ for (int k = 0; k < capacity; ++k) {
+ for (int i = 0; i < srcs; ++i) {
+ key.set(k % srcs == 0 ? k * srcs : k * srcs + i);
+ val.set(10 * k + i);
+ out[i].append(key, val);
+ if (i == k) {
+ // add duplicate key
+ out[i].append(key, val);
+ }
+ }
+ }
+ } finally {
+ if (out != null) {
+ for (int i = 0; i < srcs; ++i) {
+ if (out[i] != null)
+ out[i].close();
+ }
+ }
+ }
+ return src;
+ }
+
+ private static String stringify(IntWritable key, Writable val) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(" + key);
+ sb.append("," + val + ")");
+ return sb.toString();
+ }
+
+ private static abstract class SimpleCheckerBase<V extends Writable>
+ implements Mapper<IntWritable, V, IntWritable, IntWritable>,
+ Reducer<IntWritable, IntWritable, Text, Text> {
+ protected final static IntWritable one = new IntWritable(1);
+ int srcs;
+ public void close() { }
+ public void configure(JobConf job) {
+ srcs = job.getInt("testdatamerge.sources", 0);
+ assertTrue("Invalid src count: " + srcs, srcs > 0);
+ }
+ public abstract void map(IntWritable key, V val,
+ OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+ throws IOException;
+ public void reduce(IntWritable key, Iterator<IntWritable> values,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ int seen = 0;
+ while (values.hasNext()) {
+ seen += values.next().get();
+ }
+ assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
+ }
+ public abstract boolean verify(int key, int occ);
+ }
+
+ private static class InnerJoinChecker
+ extends SimpleCheckerBase<TupleWritable> {
+ public void map(IntWritable key, TupleWritable val,
+ OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+ throws IOException {
+ int k = key.get();
+ final String kvstr = "Unexpected tuple: " + stringify(key, val);
+ assertTrue(kvstr, 0 == k % (srcs * srcs));
+ for (int i = 0; i < val.size(); ++i) {
+ final int vali = ((IntWritable)val.get(i)).get();
+ assertTrue(kvstr, (vali - i) * srcs == 10 * k);
+ }
+ out.collect(key, one);
+ }
+ public boolean verify(int key, int occ) {
+ return (key == 0 && occ == 2) ||
+ (key != 0 && (key % (srcs * srcs) == 0) && occ == 1);
+ }
+ }
+
+ private static class OuterJoinChecker
+ extends SimpleCheckerBase<TupleWritable> {
+ public void map(IntWritable key, TupleWritable val,
+ OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+ throws IOException {
+ int k = key.get();
+ final String kvstr = "Unexpected tuple: " + stringify(key, val);
+ if (0 == k % (srcs * srcs)) {
+ for (int i = 0; i < val.size(); ++i) {
+ assertTrue(kvstr, val.get(i) instanceof IntWritable);
+ final int vali = ((IntWritable)val.get(i)).get();
+ assertTrue(kvstr, (vali - i) * srcs == 10 * k);
+ }
+ } else {
+ for (int i = 0; i < val.size(); ++i) {
+ if (i == k % srcs) {
+ assertTrue(kvstr, val.get(i) instanceof IntWritable);
+ final int vali = ((IntWritable)val.get(i)).get();
+ assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
+ } else {
+ assertTrue(kvstr, !val.has(i));
+ }
+ }
+ }
+ out.collect(key, one);
+ }
+ public boolean verify(int key, int occ) {
+ if (key < srcs * srcs && (key % (srcs + 1)) == 0)
+ return 2 == occ;
+ return 1 == occ;
+ }
+ }
+
+ private static class OverrideChecker
+ extends SimpleCheckerBase<IntWritable> {
+ public void map(IntWritable key, IntWritable val,
+ OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+ throws IOException {
+ int k = key.get();
+ final int vali = val.get();
+ final String kvstr = "Unexpected tuple: " + stringify(key, val);
+ if (0 == k % (srcs * srcs)) {
+ assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1);
+ } else {
+ final int i = k % srcs;
+ assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
+ }
+ out.collect(key, one);
+ }
+ public boolean verify(int key, int occ) {
+ if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0)
+ return 2 == occ;
+ return 1 == occ;
+ }
+ }
+
+ private static void joinAs(String jointype,
+ Class<? extends SimpleCheckerBase> c) throws Exception {
+ final int srcs = 4;
+ Configuration conf = new Configuration();
+ JobConf job = new JobConf(conf, c);
+ Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
+ Path[] src = writeSimpleSrc(base, conf, srcs);
+ job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
+ SequenceFileInputFormat.class, src));
+ job.setInt("testdatamerge.sources", srcs);
+ job.setInputFormat(CompositeInputFormat.class);
+ job.setOutputPath(new Path(base, "out"));
+
+ job.setMapperClass(c);
+ job.setReducerClass(c);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(IntWritable.class);
+ JobClient.runJob(job);
+ base.getFileSystem(job).delete(base);
+ }
+
+ public void testSimpleInnerJoin() throws Exception {
+ joinAs("inner", InnerJoinChecker.class);
+ }
+
+ public void testSimpleOuterJoin() throws Exception {
+ joinAs("outer", OuterJoinChecker.class);
+ }
+
+ public void testSimpleOverride() throws Exception {
+ joinAs("override", OverrideChecker.class);
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java Fri Dec 7 14:01:32 2007
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+public class TestTupleWritable extends TestCase {
+
+ private TupleWritable makeTuple(Writable[] writs) {
+ Writable[] sub1 = { writs[1], writs[2] };
+ Writable[] sub3 = { writs[4], writs[5] };
+ Writable[] sub2 = { writs[3], new TupleWritable(sub3), writs[6] };
+ Writable[] vals = { writs[0], new TupleWritable(sub1),
+ new TupleWritable(sub2), writs[7], writs[8],
+ writs[9] };
+ // [v0, [v1, v2], [v3, [v4, v5], v6], v7, v8, v9]
+ TupleWritable ret = new TupleWritable(vals);
+ for (int i = 0; i < 6; ++i) {
+ ret.setWritten(i);
+ }
+ ((TupleWritable)sub2[1]).setWritten(0);
+ ((TupleWritable)sub2[1]).setWritten(1);
+ ((TupleWritable)vals[1]).setWritten(0);
+ ((TupleWritable)vals[1]).setWritten(1);
+ for (int i = 0; i < 3; ++i) {
+ ((TupleWritable)vals[2]).setWritten(i);
+ }
+ return ret;
+ }
+
+ private int verifIter(Writable[] writs, TupleWritable t, int i) {
+ for (Writable w : t) {
+ if (w instanceof TupleWritable) {
+ i = verifIter(writs, ((TupleWritable)w), i);
+ continue;
+ }
+ assertTrue("Bad value", w.equals(writs[i++]));
+ }
+ return i;
+ }
+
+ public void testIterable() throws Exception {
+ Random r = new Random();
+ Writable[] writs = {
+ new BooleanWritable(r.nextBoolean()),
+ new FloatWritable(r.nextFloat()),
+ new FloatWritable(r.nextFloat()),
+ new IntWritable(r.nextInt()),
+ new LongWritable(r.nextLong()),
+ new BytesWritable("dingo".getBytes()),
+ new LongWritable(r.nextLong()),
+ new IntWritable(r.nextInt()),
+ new BytesWritable("yak".getBytes()),
+ new IntWritable(r.nextInt())
+ };
+ TupleWritable t = new TupleWritable(writs);
+ for (int i = 0; i < 6; ++i) {
+ t.setWritten(i);
+ }
+ verifIter(writs, t, 0);
+ }
+
+ public void testNestedIterable() throws Exception {
+ Random r = new Random();
+ Writable[] writs = {
+ new BooleanWritable(r.nextBoolean()),
+ new FloatWritable(r.nextFloat()),
+ new FloatWritable(r.nextFloat()),
+ new IntWritable(r.nextInt()),
+ new LongWritable(r.nextLong()),
+ new BytesWritable("dingo".getBytes()),
+ new LongWritable(r.nextLong()),
+ new IntWritable(r.nextInt()),
+ new BytesWritable("yak".getBytes()),
+ new IntWritable(r.nextInt())
+ };
+ TupleWritable sTuple = makeTuple(writs);
+ assertTrue("Bad count", writs.length == verifIter(writs, sTuple, 0));
+ }
+
+ public void testWritable() throws Exception {
+ Random r = new Random();
+ Writable[] writs = {
+ new BooleanWritable(r.nextBoolean()),
+ new FloatWritable(r.nextFloat()),
+ new FloatWritable(r.nextFloat()),
+ new IntWritable(r.nextInt()),
+ new LongWritable(r.nextLong()),
+ new BytesWritable("dingo".getBytes()),
+ new LongWritable(r.nextLong()),
+ new IntWritable(r.nextInt()),
+ new BytesWritable("yak".getBytes()),
+ new IntWritable(r.nextInt())
+ };
+ TupleWritable sTuple = makeTuple(writs);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ sTuple.write(new DataOutputStream(out));
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ TupleWritable dTuple = new TupleWritable();
+ dTuple.readFields(new DataInputStream(in));
+ assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+ }
+
+}