You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/12/07 23:01:35 UTC

svn commit: r602240 [2/2] - in /lucene/hadoop/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/join/ src/test/org/apache/hadoop/mapred/join/

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class provides an implementation of ResetableIterator. This
+ * implementation uses a byte array to store elements added to it.
+ */
+public class StreamBackedIterator<X extends Writable>
+    implements ResetableIterator<X> {
+
+  private static class ReplayableByteInputStream extends ByteArrayInputStream {
+    public ReplayableByteInputStream(byte[] arr) {
+      super(arr);
+    }
+    public void resetStream() {
+      mark = 0;
+      reset();
+    }
+  }
+
+  private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
+  private DataOutputStream outfbuf = new DataOutputStream(outbuf);
+  private ReplayableByteInputStream inbuf;
+  private DataInputStream infbuf;
+
+  public StreamBackedIterator() { }
+
+  public boolean hasNext() {
+    return infbuf != null && inbuf.available() > 0;
+  }
+
+  public boolean next(X val) throws IOException {
+    if (hasNext()) {
+      inbuf.mark(0);
+      val.readFields(infbuf);
+      return true;
+    }
+    return false;
+  }
+
+  public void replay(X val) throws IOException {
+    inbuf.reset();
+    val.readFields(infbuf);
+  }
+
+  public void reset() {
+    if (null != outfbuf) {
+      inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
+      infbuf =  new DataInputStream(inbuf);
+      outfbuf = null;
+    }
+    inbuf.resetStream();
+  }
+
+  public void add(X item) throws IOException {
+    item.write(outfbuf);
+  }
+
+  public void close() throws IOException {
+    if (null != infbuf)
+      infbuf.close();
+    if (null != outfbuf)
+      outfbuf.close();
+  }
+
+  public void clear() {
+    if (null != inbuf)
+      inbuf.resetStream();
+    outbuf.reset();
+    outfbuf = new DataOutputStream(outbuf);
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
+ */
+public class TupleWritable implements Writable, Iterable<Writable> {
+
+  private long written;
+  private Writable[] values;
+
+  /**
+   * Create an empty tuple with no allocated storage for writables.
+   */
+  public TupleWritable() { }
+
+  /**
+   * Initialize tuple with storage; unknown whether any of them contain
+   * &quot;written&quot; values.
+   */
+  public TupleWritable(Writable[] vals) {
+    written = 0L;
+    values = vals;
+  }
+
+  /**
+   * Return true if tuple has an element at the position provided.
+   */
+  public boolean has(int i) {
+    return 0 != ((1 << i) & written);
+  }
+
+  /**
+   * Get ith Writable from Tuple.
+   */
+  public Writable get(int i) {
+    return values[i];
+  }
+
+  /**
+   * The number of children in this Tuple.
+   */
+  public int size() {
+    return values.length;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean equals(Object other) {
+    if (other instanceof TupleWritable) {
+      TupleWritable that = (TupleWritable)other;
+      if (this.size() != that.size() || this.mask() != that.mask()) {
+        return false;
+      }
+      for (int i = 0; i < values.length; ++i) {
+        if (!has(i)) continue;
+        if (!values[i].equals(that.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return (int)written;
+  }
+
+  /**
+   * Return an iterator over the elements in this tuple.
+   * Note that this doesn't flatten the tuple; one may receive tuples
+   * from this iterator.
+   */
+  public Iterator<Writable> iterator() {
+    final TupleWritable t = this;
+    return new Iterator<Writable>() {
+      long i = written;
+      long last = 0L;
+      public boolean hasNext() {
+        return 0L != i;
+      }
+      public Writable next() {
+        last = Long.lowestOneBit(i);
+        if (0 == last)
+          throw new NoSuchElementException();
+        i ^= last;
+        // numberOfTrailingZeros rtn 64 if lsb set
+        return t.get(Long.numberOfTrailingZeros(last) % 64);
+      }
+      public void remove() {
+        t.written ^= last;
+        if (t.has(Long.numberOfTrailingZeros(last))) {
+          throw new IllegalStateException("Attempt to remove non-existent val");
+        }
+      }
+    };
+  }
+
+  /**
+   * Convert Tuple to String as in the following.
+   * <tt>[<child1>,<child2>,...,<childn>]</tt>
+   */
+  public String toString() {
+    StringBuffer buf = new StringBuffer("[");
+    for (int i = 0; i < values.length; ++i) {
+      buf.append(has(i) ? values[i].toString() : "");
+      buf.append(",");
+    }
+    if (values.length != 0)
+      buf.setCharAt(buf.length() - 1, ']');
+    else
+      buf.append(']');
+    return buf.toString();
+  }
+
+  // Writable
+
+  /** Writes each Writable to <code>out</code>.
+   * TupleWritable format:
+   * {@code
+   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, values.length);
+    WritableUtils.writeVLong(out, written);
+    for (int i = 0; i < values.length; ++i) {
+      Text.writeString(out, values[i].getClass().getName());
+    }
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i)) {
+        values[i].write(out);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    values = new Writable[card];
+    written = WritableUtils.readVLong(in);
+    Class<? extends Writable>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
+      }
+      for (int i = 0; i < card; ++i) {
+          values[i] = cls[i].newInstance();
+        if (has(i)) {
+          values[i].readFields(in);
+        }
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException)new IOException("Failed tuple init").initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException)new IOException("Failed tuple init").initCause(e);
+    } catch (InstantiationException e) {
+      throw (IOException)new IOException("Failed tuple init").initCause(e);
+    }
+  }
+
+  /**
+   * Record that the tuple contains an element at the position provided.
+   */
+  void setWritten(int i) {
+    written |= 1 << i;
+  }
+
+  /**
+   * Record that the tuple does not contain an element at the position
+   * provided.
+   */
+  void clearWritten(int i) {
+    written &= -1 ^ (1 << i);
+  }
+
+  /**
+   * Clear any record of which writables have been written to, without
+   * releasing storage.
+   */
+  void clearWritten() {
+    written = 0L;
+  }
+
+  /**
+   * Return a bitmap recording which of the writables that have been
+   * written to.
+   */
+  long mask() {
+    return written;
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Proxy class for a RecordReader participating in the join framework.
+ * This class keeps track of the &quot;head&quot; key-value pair for the
+ * provided RecordReader and keeps a store of values matching a key when
+ * this source is participating in a join.
+ */
+class WrappedRecordReader<K extends WritableComparable,
+                          U extends Writable>
+    implements ComposableRecordReader<K,U> {
+
+  private boolean empty = false;
+  private RecordReader<K,U> rr;
+  private int id;  // index at which values will be inserted in collector
+
+  private K khead; // key at the top of this RR
+  private U vhead; // value assoc with khead
+  private WritableComparator cmp;
+
+  private ResetableIterator<U> vjoin;
+
+  /**
+   * For a given RecordReader rr, occupy position id in collector.
+   */
+  WrappedRecordReader(int id, RecordReader<K,U> rr,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    this.id = id;
+    this.rr = rr;
+    khead = rr.createKey();
+    vhead = rr.createValue();
+    try {
+      cmp = (null == cmpcl)
+        ? WritableComparator.get(khead.getClass())
+        : cmpcl.newInstance();
+    } catch (InstantiationException e) {
+      throw (IOException)new IOException().initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException)new IOException().initCause(e);
+    }
+    vjoin = new StreamBackedIterator<U>();
+    next();
+  }
+
+  /** {@inheritDoc} */
+  public int id() {
+    return id;
+  }
+
+  /**
+   * Return the key at the head of this RR.
+   */
+  public K key() {
+    return khead;
+  }
+
+  /**
+   * Clone the key at the head of this RR into the object supplied.
+   */
+  public void key(K qkey) throws IOException {
+    WritableUtils.cloneInto(qkey, khead);
+  }
+
+  /**
+   * Return true if the RR- including the k,v pair stored in this object-
+   * is exhausted.
+   */
+  public boolean hasNext() {
+    return !empty;
+  }
+
+  /**
+   * Skip key-value pairs with keys less than or equal to the key provided.
+   */
+  public void skip(K key) throws IOException {
+    if (hasNext()) {
+      while (cmp.compare(khead, key) <= 0 && next());
+    }
+  }
+
+  /**
+   * Read the next k,v pair into the head of this object; return true iff
+   * the RR and this are exhausted.
+   */
+  protected boolean next() throws IOException {
+    empty = !rr.next(khead, vhead);
+    return hasNext();
+  }
+
+  /**
+   * Add an iterator to the collector at the position occupied by this
+   * RecordReader over the values in this stream paired with the key
+   * provided (ie register a stream of values from this source matching K
+   * with a collector).
+   */
+                                 // JoinCollector comes from parent, which has
+  @SuppressWarnings("unchecked") // no static type for the slot this sits in
+  public void accept(CompositeRecordReader.JoinCollector i, K key)
+      throws IOException {
+    vjoin.clear();
+    if (0 == cmp.compare(key, khead)) {
+      do {
+        vjoin.add(vhead);
+      } while (next() && 0 == cmp.compare(key, khead));
+    }
+    i.add(id, vjoin);
+  }
+
+  /**
+   * Write key-value pair at the head of this stream to the objects provided;
+   * get next key-value pair from proxied RR.
+   */
+  public boolean next(K key, U value) throws IOException {
+    if (hasNext()) {
+      WritableUtils.cloneInto(key, khead);
+      WritableUtils.cloneInto(value, vhead);
+      next();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Request new key from proxied RR.
+   */
+  public K createKey() {
+    return rr.createKey();
+  }
+
+  /**
+   * Request new value from proxied RR.
+   */
+  public U createValue() {
+    return rr.createValue();
+  }
+
+  /**
+   * Request progress from proxied RR.
+   */
+  public float getProgress() throws IOException {
+    return rr.getProgress();
+  }
+
+  /**
+   * Request position from proxied RR.
+   */
+  public long getPos() throws IOException {
+    return rr.getPos();
+  }
+
+  /**
+   * Forward close request to proxied RR.
+   */
+  public void close() throws IOException {
+    rr.close();
+  }
+
+  /**
+   * Implement Comparable contract (compare key at head of proxied RR
+   * with that of another).
+   */
+  public int compareTo(ComposableRecordReader<K,?> other) {
+    return cmp.compare(key(), other.key());
+  }
+
+  /**
+   * Return true iff compareTo(other) retn true.
+   */
+  @SuppressWarnings("unchecked") // Explicit type check prior to cast
+  public boolean equals(Object other) {
+    return other instanceof ComposableRecordReader
+        && 0 == compareTo((ComposableRecordReader)other);
+  }
+
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return 42;
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html Fri Dec  7 14:01:32 2007
@@ -0,0 +1,88 @@
+<HTML>
+
+<BODY>
+
+<p>Given a set of sorted datasets keyed with the same class and yielding equal
+partitions, it is possible to effect a join of those datasets prior to the map.
+This could save costs in re-partitioning, sorting, shuffling, and writing out
+data required in the general case.</p>
+
+<h3><a name="Interface"></a>Interface</h3>
+
+<p>The attached code offers the following interface to users of these
+classes.</p>
+
+<table>
+<tr><th>property</th><th>required</th><th>value</th></tr>
+<tr><td>mapred.join.expr</td><td>yes</td>
+    <td>Join expression to effect over input data</td></tr>
+<tr><td>mapred.join.keycomparator</td><td>no</td>
+    <td><tt>WritableComparator</tt> class to use for comparing keys</td></tr>
+<tr><td>mapred.join.define.&lt;ident&gt;</td><td>no</td>
+    <td>Class mapped to identifier in join expression</td></tr>
+</table>
+
+<p>The join expression understands the following grammar:</p>
+
+<pre>func ::= &lt;ident&gt;([&lt;func&gt;,]*&lt;func&gt;)
+func ::= tbl(&lt;class&gt;,"&lt;path&gt;");
+
+</pre>
+
+<p>Operations included in this patch are partitioned into one of two types:
+join operations emitting tuples and "multi-filter" operations emitting a
+single value from (but not necessarily included in) a set of input values.
+For a given key, each operation will consider the cross product of all
+values for all sources at that node.</p>
+
+<p>Identifiers supported by default:</p>
+
+<table>
+<tr><th>identifier</th><th>type</th><th>description</th></tr>
+<tr><td>inner</td><td>Join</td><td>Full inner join</td></tr>
+<tr><td>outer</td><td>Join</td><td>Full outer join</td></tr>
+<tr><td>override</td><td>MultiFilter</td>
+    <td>For a given key, prefer values from the rightmost source</td></tr>
+</table>
+
+<p>A user of this class must set the <tt>InputFormat</tt> for the job to
+<tt>CompositeInputFormat</tt> and define a join expression accepted by the
+preceding grammar. For example, both of the following are acceptable:</p>
+
+<pre>inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+          "hdfs://host:8020/foo/bar"),
+      tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+          "hdfs://host:8020/foo/baz"))
+
+outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+                   "hdfs://host:8020/foo/bar"),
+               tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+                   "hdfs://host:8020/foo/baz")),
+      tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
+          "hdfs://host:8020/foo/rab"))
+</pre>
+
+<p><tt>CompositeInputFormat</tt> includes a handful of convenience methods to
+aid construction of these verbose statements.</p>
+
+<p>As in the second example, joins may be nested. Users may provide a
+comparator class in the <tt>mapred.join.keycomparator</tt> property to specify
+the ordering of their keys, or accept the default comparator as returned by
+<tt>WritableComparator.get(keyclass)</tt>.</p>
+
+<p>Users can specify their own join operations, typically by overriding
+<tt>JoinRecordReader</tt> or <tt>MultiFilterRecordReader</tt> and mapping that
+class to an identifier in the join expression using the
+<tt>mapred.join.define.<em>ident</em></tt> property, where <em>ident</em> is
+the identifier appearing in the join expression. Users may elect to emit- or
+modify- values passing through their join operation. Consulting the existing
+operations for guidance is recommended. Adding arguments is considerably more
+complex (and only partially supported), as one must also add a <tt>Node</tt>
+type to the parse tree. One is probably better off extending
+<tt>RecordReader</tt> in most cases.</p>
+
+<a href="http://issues.apache.org/jira/browse/HADOOP-2085">JIRA</a>
+
+</BODY>
+
+</HTML>

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+public class TestDatamerge extends TestCase {
+
+  private static MiniDFSCluster cluster = null;
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestDatamerge.class)) {
+      protected void setUp() throws Exception {
+        Configuration conf = new Configuration();
+        cluster = new MiniDFSCluster(conf, 2, true, null);
+      }
+      protected void tearDown() throws Exception {
+        if (cluster != null) {
+          cluster.shutdown();
+        }
+      }
+    };
+    return setup;
+  }
+
+  private static SequenceFile.Writer[] createWriters(Path testdir,
+      Configuration conf, int srcs, Path[] src) throws IOException {
+    for (int i = 0; i < srcs; ++i) {
+      src[i] = new Path(testdir, Integer.toString(i + 10, 36));
+    }
+    SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
+    for (int i = 0; i < srcs; ++i) {
+      out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
+          src[i], IntWritable.class, IntWritable.class);
+    }
+    return out;
+  }
+
+  private static Path[] writeSimpleSrc(Path testdir, Configuration conf,
+      int srcs) throws IOException {
+    SequenceFile.Writer out[] = null;
+    Path[] src = new Path[srcs];
+    try {
+      out = createWriters(testdir, conf, srcs, src);
+      final int capacity = srcs * 2 + 1;
+      IntWritable key = new IntWritable();
+      IntWritable val = new IntWritable();
+      for (int k = 0; k < capacity; ++k) {
+        for (int i = 0; i < srcs; ++i) {
+          key.set(k % srcs == 0 ? k * srcs : k * srcs + i);
+          val.set(10 * k + i);
+          out[i].append(key, val);
+          if (i == k) {
+            // add duplicate key
+            out[i].append(key, val);
+          }
+        }
+      }
+    } finally {
+      if (out != null) {
+        for (int i = 0; i < srcs; ++i) {
+          if (out[i] != null)
+            out[i].close();
+        }
+      }
+    }
+    return src;
+  }
+
+  private static String stringify(IntWritable key, Writable val) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(" + key);
+    sb.append("," + val + ")");
+    return sb.toString();
+  }
+
+  private static abstract class SimpleCheckerBase<V extends Writable>
+      implements Mapper<IntWritable, V, IntWritable, IntWritable>,
+                 Reducer<IntWritable, IntWritable, Text, Text> {
+    protected final static IntWritable one = new IntWritable(1);
+    int srcs;
+    public void close() { }
+    public void configure(JobConf job) {
+      srcs = job.getInt("testdatamerge.sources", 0);
+      assertTrue("Invalid src count: " + srcs, srcs > 0);
+    }
+    public abstract void map(IntWritable key, V val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException;
+    public void reduce(IntWritable key, Iterator<IntWritable> values,
+                       OutputCollector<Text, Text> output,
+                       Reporter reporter) throws IOException {
+      int seen = 0;
+      while (values.hasNext()) {
+        seen += values.next().get();
+      }
+      assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
+    }
+    public abstract boolean verify(int key, int occ);
+  }
+
+  private static class InnerJoinChecker
+      extends SimpleCheckerBase<TupleWritable> {
+    public void map(IntWritable key, TupleWritable val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException {
+      int k = key.get();
+      final String kvstr = "Unexpected tuple: " + stringify(key, val);
+      assertTrue(kvstr, 0 == k % (srcs * srcs));
+      for (int i = 0; i < val.size(); ++i) {
+        final int vali = ((IntWritable)val.get(i)).get();
+        assertTrue(kvstr, (vali - i) * srcs == 10 * k);
+      }
+      out.collect(key, one);
+    }
+    public boolean verify(int key, int occ) {
+      return (key == 0 && occ == 2) ||
+             (key != 0 && (key % (srcs * srcs) == 0) && occ == 1);
+    }
+  }
+
+  private static class OuterJoinChecker
+      extends SimpleCheckerBase<TupleWritable> {
+    public void map(IntWritable key, TupleWritable val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException {
+      int k = key.get();
+      final String kvstr = "Unexpected tuple: " + stringify(key, val);
+      if (0 == k % (srcs * srcs)) {
+        for (int i = 0; i < val.size(); ++i) {
+          assertTrue(kvstr, val.get(i) instanceof IntWritable);
+          final int vali = ((IntWritable)val.get(i)).get();
+          assertTrue(kvstr, (vali - i) * srcs == 10 * k);
+        }
+      } else {
+        for (int i = 0; i < val.size(); ++i) {
+          if (i == k % srcs) {
+            assertTrue(kvstr, val.get(i) instanceof IntWritable);
+            final int vali = ((IntWritable)val.get(i)).get();
+            assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
+          } else {
+            assertTrue(kvstr, !val.has(i));
+          }
+        }
+      }
+      out.collect(key, one);
+    }
+    public boolean verify(int key, int occ) {
+      if (key < srcs * srcs && (key % (srcs + 1)) == 0)
+        return 2 == occ;
+      return 1 == occ;
+    }
+  }
+
+  private static class OverrideChecker
+      extends SimpleCheckerBase<IntWritable> {
+    public void map(IntWritable key, IntWritable val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException {
+      int k = key.get();
+      final int vali = val.get();
+      final String kvstr = "Unexpected tuple: " + stringify(key, val);
+      if (0 == k % (srcs * srcs)) {
+        assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1);
+      } else {
+        final int i = k % srcs;
+        assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
+      }
+      out.collect(key, one);
+    }
+    public boolean verify(int key, int occ) {
+      if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0)
+        return 2 == occ;
+      return 1 == occ;
+    }
+  }
+
+  private static void joinAs(String jointype,
+      Class<? extends SimpleCheckerBase> c) throws Exception {
+    final int srcs = 4;
+    Configuration conf = new Configuration();
+    JobConf job = new JobConf(conf, c);
+    Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
+    Path[] src = writeSimpleSrc(base, conf, srcs);
+    job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
+        SequenceFileInputFormat.class, src));
+    job.setInt("testdatamerge.sources", srcs);
+    job.setInputFormat(CompositeInputFormat.class);
+    job.setOutputPath(new Path(base, "out"));
+
+    job.setMapperClass(c);
+    job.setReducerClass(c);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(IntWritable.class);
+    JobClient.runJob(job);
+    base.getFileSystem(job).delete(base);
+  }
+
+  public void testSimpleInnerJoin() throws Exception {
+    joinAs("inner", InnerJoinChecker.class);
+  }
+
+  public void testSimpleOuterJoin() throws Exception {
+    joinAs("outer", OuterJoinChecker.class);
+  }
+
+  public void testSimpleOverride() throws Exception {
+    joinAs("override", OverrideChecker.class);
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java?rev=602240&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java Fri Dec  7 14:01:32 2007
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+public class TestTupleWritable extends TestCase {
+
+  private TupleWritable makeTuple(Writable[] writs) {
+    Writable[] sub1 = { writs[1], writs[2] };
+    Writable[] sub3 = { writs[4], writs[5] };
+    Writable[] sub2 = { writs[3], new TupleWritable(sub3), writs[6] };
+    Writable[] vals = { writs[0], new TupleWritable(sub1),
+                        new TupleWritable(sub2), writs[7], writs[8],
+                        writs[9] };
+    // [v0, [v1, v2], [v3, [v4, v5], v6], v7, v8, v9]
+    TupleWritable ret = new TupleWritable(vals);
+    for (int i = 0; i < 6; ++i) {
+      ret.setWritten(i);
+    }
+    ((TupleWritable)sub2[1]).setWritten(0);
+    ((TupleWritable)sub2[1]).setWritten(1);
+    ((TupleWritable)vals[1]).setWritten(0);
+    ((TupleWritable)vals[1]).setWritten(1);
+    for (int i = 0; i < 3; ++i) {
+      ((TupleWritable)vals[2]).setWritten(i);
+    }
+    return ret;
+  }
+
+  private int verifIter(Writable[] writs, TupleWritable t, int i) {
+    for (Writable w : t) {
+      if (w instanceof TupleWritable) {
+        i = verifIter(writs, ((TupleWritable)w), i);
+        continue;
+      }
+      assertTrue("Bad value", w.equals(writs[i++]));
+    }
+    return i;
+  }
+
+  public void testIterable() throws Exception {
+    Random r = new Random();
+    Writable[] writs = {
+      new BooleanWritable(r.nextBoolean()),
+      new FloatWritable(r.nextFloat()),
+      new FloatWritable(r.nextFloat()),
+      new IntWritable(r.nextInt()),
+      new LongWritable(r.nextLong()),
+      new BytesWritable("dingo".getBytes()),
+      new LongWritable(r.nextLong()),
+      new IntWritable(r.nextInt()),
+      new BytesWritable("yak".getBytes()),
+      new IntWritable(r.nextInt())
+    };
+    TupleWritable t = new TupleWritable(writs);
+    for (int i = 0; i < 6; ++i) {
+      t.setWritten(i);
+    }
+    verifIter(writs, t, 0);
+  }
+
+  public void testNestedIterable() throws Exception {
+    Random r = new Random();
+    Writable[] writs = {
+      new BooleanWritable(r.nextBoolean()),
+      new FloatWritable(r.nextFloat()),
+      new FloatWritable(r.nextFloat()),
+      new IntWritable(r.nextInt()),
+      new LongWritable(r.nextLong()),
+      new BytesWritable("dingo".getBytes()),
+      new LongWritable(r.nextLong()),
+      new IntWritable(r.nextInt()),
+      new BytesWritable("yak".getBytes()),
+      new IntWritable(r.nextInt())
+    };
+    TupleWritable sTuple = makeTuple(writs);
+    assertTrue("Bad count", writs.length == verifIter(writs, sTuple, 0));
+  }
+
+  public void testWritable() throws Exception {
+    Random r = new Random();
+    Writable[] writs = {
+      new BooleanWritable(r.nextBoolean()),
+      new FloatWritable(r.nextFloat()),
+      new FloatWritable(r.nextFloat()),
+      new IntWritable(r.nextInt()),
+      new LongWritable(r.nextLong()),
+      new BytesWritable("dingo".getBytes()),
+      new LongWritable(r.nextLong()),
+      new IntWritable(r.nextInt()),
+      new BytesWritable("yak".getBytes()),
+      new IntWritable(r.nextInt())
+    };
+    TupleWritable sTuple = makeTuple(writs);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    sTuple.write(new DataOutputStream(out));
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    TupleWritable dTuple = new TupleWritable();
+    dTuple.readFields(new DataInputStream(in));
+    assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+  }
+
+}