You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/12/02 11:41:33 UTC

svn commit: r1546951 [1/2] - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/join/ core/src/test/java/org/apache/hama/bsp/

Author: millecker
Date: Mon Dec  2 10:41:32 2013
New Revision: 1546951

URL: http://svn.apache.org/r1546951
Log:
HAMA-774: CompositeInputFormat in Hama

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCompositeInputFormat.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1546951&r1=1546950&r2=1546951&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Dec  2 10:41:32 2013
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-774: CompositeInputFormat in Hama (Martin Illecker)
    HAMA-815: Hama Pipes uses C++ templates (Martin Illecker)  
 
   BUG FIXES

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1546951&r1=1546950&r2=1546951&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Dec  2 10:41:32 2013
@@ -324,7 +324,8 @@ public class BSPJobClient extends Config
     short replication = (short) job.getInt("bsp.submit.replication", 10);
 
     // only create the splits if we have an input
-    if (job.get("bsp.input.dir") != null) {
+    if ((job.get("bsp.input.dir") != null)
+        || (job.get("bsp.join.expr") != null)) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
 

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+
+/**
+ * Refinement of InputFormat requiring implementors to provide
+ * ComposableRecordReader instead of RecordReader.
+ */
+public interface ComposableInputFormat<K extends WritableComparable, V extends Writable>
+    extends InputFormat<K, V> {
+
+  ComposableRecordReader<K, V> getRecordReader(InputSplit split, BSPJob job)
+      throws IOException;
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Additional operations required of a RecordReader to participate in a join.
+ */
+public interface ComposableRecordReader<K extends WritableComparable, V extends Writable>
+    extends RecordReader<K, V>, Comparable<ComposableRecordReader<K, ?>> {
+
+  /**
+   * Return the position in the collector this class occupies.
+   */
+  int id();
+
+  /**
+   * Return the key this RecordReader would supply on a call to next(K,V)
+   */
+  K key();
+
+  /**
+   * Clone the key at the head of this RecordReader into the object provided.
+   */
+  void key(K key) throws IOException;
+
+  /**
+   * Returns true if the stream is not empty, but provides no guarantee that a
+   * call to next(K,V) will succeed.
+   */
+  boolean hasNext();
+
+  /**
+   * Skip key-value pairs with keys less than or equal to the key provided.
+   */
+  void skip(K key) throws IOException;
+
+  /**
+   * While key-value pairs from this RecordReader match the given key, register
+   * them with the JoinCollector provided.
+   */
+  void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException;
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+
+public class CompositeInputFormat<K extends WritableComparable> implements
+    ComposableInputFormat<K, TupleWritable> {
+
+  // expression parse tree to which IF requests are proxied
+  private Parser.Node root;
+
+  public CompositeInputFormat() {
+  }
+
+  /**
+   * Interpret a given string as a composite expression.
+   * {@code
+   *   func  ::= <ident>([<func>,]*<func>)
+   *   func  ::= tbl(<class>,"<path>")
+   *   class ::= @see java.lang.Class#forName(java.lang.String)
+   *   path  ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
+   * } Reads expression from the <tt>mapred.join.expr</tt> property and
+   * user-supplied join types from <tt>mapred.join.define.&lt;ident&gt;</tt>
+   * types. Paths supplied to <tt>tbl</tt> are given as input paths to the
+   * InputFormat class listed.
+   * 
+   * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
+   */
+  public void setFormat(BSPJob job) throws IOException {
+    addDefaults();
+    addUserIdentifiers(job);
+    root = Parser.parse(job.get("bsp.join.expr"), job);
+  }
+
+  /**
+   * Adds the default set of identifiers to the parser.
+   */
+  protected void addDefaults() {
+    try {
+      Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
+      Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
+      Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
+      Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException("FATAL: Failed to init defaults", e);
+    }
+  }
+
+  /**
+   * Inform the parser of user-defined types.
+   */
+  private void addUserIdentifiers(BSPJob job) throws IOException {
+    Pattern x = Pattern.compile("^bsp\\.join\\.define\\.(\\w+)$");
+    for (Map.Entry<String, String> kv : job.getConfiguration()) {
+      Matcher m = x.matcher(kv.getKey());
+      if (m.matches()) {
+        try {
+          Parser.CNode.addIdentifier(m.group(1), job.getConfiguration()
+              .getClass(m.group(0), null, ComposableRecordReader.class));
+        } catch (NoSuchMethodException e) {
+          throw (IOException) new IOException("Invalid define for "
+              + m.group(1)).initCause(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Build a CompositeInputSplit from the child InputFormats by assigning the
+   * ith split from each child to the ith composite split.
+   */
+  public InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException {
+    setFormat(job);
+    job.getConfiguration().setLong("bsp.min.split.size", Long.MAX_VALUE);
+    return root.getSplits(job, numBspTask);
+  }
+
+  /**
+   * Construct a CompositeRecordReader for the children of this InputFormat as
+   * defined in the init expression. The outermost join need only be composable,
+   * not necessarily a composite. Mandating TupleWritable isn't strictly
+   * correct.
+   */
+  @SuppressWarnings("unchecked")
+  // child types unknown
+  public ComposableRecordReader<K, TupleWritable> getRecordReader(
+      InputSplit split, BSPJob job) throws IOException {
+    setFormat(job);
+    return root.getRecordReader(split, job);
+  }
+
+  /**
+   * Convenience method for constructing composite formats. Given InputFormat
+   * class (inf), path (p) return: {@code tbl(<inf>,
+   * <p>) }
+   */
+  public static String compose(Class<? extends InputFormat> inf, String path) {
+    return compose(inf.getName().intern(), path, new StringBuffer()).toString();
+  }
+
+  /**
+   * Convenience method for constructing composite formats. Given operation
+   * (op), Object class (inf), set of paths (p) return:
+   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+   */
+  public static String compose(String op, Class<? extends InputFormat> inf,
+      String... path) {
+    final String infname = inf.getName();
+    StringBuffer ret = new StringBuffer(op + '(');
+    for (String p : path) {
+      compose(infname, p, ret);
+      ret.append(',');
+    }
+    ret.setCharAt(ret.length() - 1, ')');
+    return ret.toString();
+  }
+
+  /**
+   * Convenience method for constructing composite formats. Given operation
+   * (op), Object class (inf), set of paths (p) return:
+   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+   */
+  public static String compose(String op, Class<? extends InputFormat> inf,
+      Path... path) {
+    ArrayList<String> tmp = new ArrayList<String>(path.length);
+    for (Path p : path) {
+      tmp.add(p.toString());
+    }
+    return compose(op, inf, tmp.toArray(new String[0]));
+  }
+
+  private static StringBuffer compose(String inf, String path, StringBuffer sb) {
+    sb.append("tbl(" + inf + ",\"");
+    sb.append(path);
+    sb.append("\")");
+    return sb;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.InputSplit;
+
+/**
+ * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
+ * into this collection must have a public default constructor.
+ */
+public class CompositeInputSplit implements InputSplit {
+
+  private int fill = 0;
+  private long totsize = 0L;
+  private InputSplit[] splits;
+
+  public CompositeInputSplit() {
+  }
+
+  public CompositeInputSplit(int capacity) {
+    splits = new InputSplit[capacity];
+  }
+
+  /**
+   * Add an InputSplit to this collection.
+   * 
+   * @throws IOException If capacity was not specified during construction or if
+   *           capacity has been reached.
+   */
+  public void add(InputSplit s) throws IOException {
+    if (null == splits) {
+      throw new IOException("Uninitialized InputSplit");
+    }
+    if (fill == splits.length) {
+      throw new IOException("Too many splits");
+    }
+    splits[fill++] = s;
+    totsize += s.getLength();
+  }
+
+  /**
+   * Get ith child InputSplit.
+   */
+  public InputSplit get(int i) {
+    return splits[i];
+  }
+
+  /**
+   * Return the aggregate length of all child InputSplits currently added.
+   */
+  public long getLength() throws IOException {
+    return totsize;
+  }
+
+  /**
+   * Get the length of ith child InputSplit.
+   */
+  public long getLength(int i) throws IOException {
+    return splits[i].getLength();
+  }
+
+  /**
+   * Collect a set of hosts from all child InputSplits.
+   */
+  public String[] getLocations() throws IOException {
+    HashSet<String> hosts = new HashSet<String>();
+    for (InputSplit s : splits) {
+      String[] hints = s.getLocations();
+      if (hints != null && hints.length > 0) {
+        for (String host : hints) {
+          hosts.add(host);
+        }
+      }
+    }
+    return hosts.toArray(new String[hosts.size()]);
+  }
+
+  /**
+   * getLocations from ith InputSplit.
+   */
+  public String[] getLocation(int i) throws IOException {
+    return splits[i].getLocations();
+  }
+
+  /**
+   * Write splits in the following format.
+   * {@code
+   * <count><class1><class2>...<classn><split1><split2>...<splitn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, splits.length);
+    for (InputSplit s : splits) {
+      Text.writeString(out, s.getClass().getName());
+    }
+    for (InputSplit s : splits) {
+      s.write(out);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * 
+   * @throws IOException If the child InputSplit cannot be read, typically for
+   *           faliing access checks.
+   */
+  @SuppressWarnings("unchecked")
+  // Generic array assignment
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    if (splits == null || splits.length != card) {
+      splits = new InputSplit[card];
+    }
+    Class<? extends InputSplit>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        cls[i] = Class.forName(Text.readString(in))
+            .asSubclass(InputSplit.class);
+      }
+      for (int i = 0; i < card; ++i) {
+        splits[i] = ReflectionUtils.newInstance(cls[i], null);
+        splits[i].readFields(in);
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException) new IOException("Failed split init").initCause(e);
+    }
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * A RecordReader that can effect joins of RecordReaders sharing a common key
+ * type and partitioning.
+ */
+public abstract class CompositeRecordReader<K extends WritableComparable, // key
+// type
+V extends Writable, // accepts RecordReader<K,V> as children
+X extends Writable> // emits Writables of this type
+    implements Configurable {
+
+  private int id;
+  private Configuration conf;
+  private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
+
+  private WritableComparator cmp;
+  private Class<? extends WritableComparable> keyclass;
+  private PriorityQueue<ComposableRecordReader<K, ?>> q;
+
+  protected final JoinCollector jc;
+  protected final ComposableRecordReader<K, ? extends V>[] kids;
+
+  protected abstract boolean combine(Object[] srcs, TupleWritable value);
+
+  /**
+   * Create a RecordReader with <tt>capacity</tt> children to position
+   * <tt>id</tt> in the parent reader. The id of a root CompositeRecordReader is
+   * -1 by convention, but relying on this is not recommended.
+   */
+  @SuppressWarnings("unchecked")
+  // Generic array assignment
+  public CompositeRecordReader(int id, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    assert capacity > 0 : "Invalid capacity";
+    this.id = id;
+    if (null != cmpcl) {
+      cmp = ReflectionUtils.newInstance(cmpcl, null);
+      q = new PriorityQueue<ComposableRecordReader<K, ?>>(3,
+          new Comparator<ComposableRecordReader<K, ?>>() {
+            public int compare(ComposableRecordReader<K, ?> o1,
+                ComposableRecordReader<K, ?> o2) {
+              return cmp.compare(o1.key(), o2.key());
+            }
+          });
+    }
+    jc = new JoinCollector(capacity);
+    kids = new ComposableRecordReader[capacity];
+  }
+
+  /**
+   * Return the position in the collector this class occupies.
+   */
+  public int id() {
+    return id;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Return sorted list of RecordReaders for this composite.
+   */
+  protected PriorityQueue<ComposableRecordReader<K, ?>> getRecordReaderQueue() {
+    return q;
+  }
+
+  /**
+   * Return comparator defining the ordering for RecordReaders in this
+   * composite.
+   */
+  protected WritableComparator getComparator() {
+    return cmp;
+  }
+
+  /**
+   * Add a RecordReader to this collection. The id() of a RecordReader
+   * determines where in the Tuple its entry will appear. Adding RecordReaders
+   * with the same id has undefined behavior.
+   */
+  public void add(ComposableRecordReader<K, ? extends V> rr) throws IOException {
+    kids[rr.id()] = rr;
+    if (null == q) {
+      cmp = WritableComparator.get(rr.createKey().getClass());
+      q = new PriorityQueue<ComposableRecordReader<K, ?>>(3,
+          new Comparator<ComposableRecordReader<K, ?>>() {
+            public int compare(ComposableRecordReader<K, ?> o1,
+                ComposableRecordReader<K, ?> o2) {
+              return cmp.compare(o1.key(), o2.key());
+            }
+          });
+    }
+    if (rr.hasNext()) {
+      q.add(rr);
+    }
+  }
+
+  /**
+   * Collector for join values. This accumulates values for a given key from the
+   * child RecordReaders. If one or more child RR contain duplicate keys, this
+   * will emit the cross product of the associated values until exhausted.
+   */
+  class JoinCollector {
+    private K key;
+    private ResetableIterator<X>[] iters;
+    private int pos = -1;
+    private boolean first = true;
+
+    /**
+     * Construct a collector capable of handling the specified number of
+     * children.
+     */
+    @SuppressWarnings("unchecked")
+    // Generic array assignment
+    public JoinCollector(int card) {
+      iters = new ResetableIterator[card];
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i] = EMPTY;
+      }
+    }
+
+    /**
+     * Register a given iterator at position id.
+     */
+    public void add(int id, ResetableIterator<X> i) throws IOException {
+      iters[id] = i;
+    }
+
+    /**
+     * Return the key associated with this collection.
+     */
+    public K key() {
+      return key;
+    }
+
+    /**
+     * Codify the contents of the collector to be iterated over. When this is
+     * called, all RecordReaders registered for this key should have added
+     * ResetableIterators.
+     */
+    public void reset(K key) {
+      this.key = key;
+      first = true;
+      pos = iters.length - 1;
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].reset();
+      }
+    }
+
+    /**
+     * Clear all state information.
+     */
+    public void clear() {
+      key = null;
+      pos = -1;
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].clear();
+        iters[i] = EMPTY;
+      }
+    }
+
+    /**
+     * Returns false if exhausted or if reset(K) has not been called.
+     */
+    protected boolean hasNext() {
+      return !(pos < 0);
+    }
+
+    /**
+     * Populate Tuple from iterators. It should be the case that, given
+     * iterators i_1...i_n over values from sources s_1...s_n sharing key k,
+     * repeated calls to next should yield I x I.
+     */
+    @SuppressWarnings("unchecked")
+    // No static typeinfo on Tuples
+    protected boolean next(TupleWritable val) throws IOException {
+      if (first) {
+        int i = -1;
+        for (pos = 0; pos < iters.length; ++pos) {
+          if (iters[pos].hasNext() && iters[pos].next((X) val.get(pos))) {
+            i = pos;
+            val.setWritten(i);
+          }
+        }
+        pos = i;
+        first = false;
+        if (pos < 0) {
+          clear();
+          return false;
+        }
+        return true;
+      }
+      while (0 <= pos
+          && !(iters[pos].hasNext() && iters[pos].next((X) val.get(pos)))) {
+        --pos;
+      }
+      if (pos < 0) {
+        clear();
+        return false;
+      }
+      val.setWritten(pos);
+      for (int i = 0; i < pos; ++i) {
+        if (iters[i].replay((X) val.get(i))) {
+          val.setWritten(i);
+        }
+      }
+      while (pos + 1 < iters.length) {
+        ++pos;
+        iters[pos].reset();
+        if (iters[pos].hasNext() && iters[pos].next((X) val.get(pos))) {
+          val.setWritten(pos);
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Replay the last Tuple emitted.
+     */
+    @SuppressWarnings("unchecked")
+    // No static typeinfo on Tuples
+    public boolean replay(TupleWritable val) throws IOException {
+      // The last emitted tuple might have drawn on an empty source;
+      // it can't be cleared prematurely, b/c there may be more duplicate
+      // keys in iterator positions < pos
+      assert !first;
+      boolean ret = false;
+      for (int i = 0; i < iters.length; ++i) {
+        if (iters[i].replay((X) val.get(i))) {
+          val.setWritten(i);
+          ret = true;
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * Close all child iterators.
+     */
+    public void close() throws IOException {
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].close();
+      }
+    }
+
+    /**
+     * Write the next value into key, value as accepted by the operation
+     * associated with this set of RecordReaders.
+     */
+    public boolean flush(TupleWritable value) throws IOException {
+      while (hasNext()) {
+        value.clearWritten();
+        if (next(value) && combine(kids, value)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Return the key for the current join or the value at the top of the
+   * RecordReader heap.
+   */
+  public K key() {
+    if (jc.hasNext()) {
+      return jc.key();
+    }
+    if (!q.isEmpty()) {
+      return q.peek().key();
+    }
+    return null;
+  }
+
+  /**
+   * Clone the key at the top of this RR into the given object.
+   */
+  public void key(K key) throws IOException {
+    WritableUtils.cloneInto(key, key());
+  }
+
+  /**
+   * Return true if it is possible that this could emit more values.
+   */
+  public boolean hasNext() {
+    return jc.hasNext() || !q.isEmpty();
+  }
+
+  /**
+   * Pass skip key to child RRs.
+   */
+  public void skip(K key) throws IOException {
+    ArrayList<ComposableRecordReader<K, ?>> tmp = new ArrayList<ComposableRecordReader<K, ?>>();
+    while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
+      tmp.add(q.poll());
+    }
+    for (ComposableRecordReader<K, ?> rr : tmp) {
+      rr.skip(key);
+      if (rr.hasNext()) {
+        q.add(rr);
+      }
+    }
+  }
+
+  /**
+   * Obtain an iterator over the child RRs apropos of the value type ultimately
+   * emitted from this join.
+   */
+  protected abstract ResetableIterator<X> getDelegate();
+
+  /**
+   * If key provided matches that of this Composite, give JoinCollector iterator
+   * over values it may emit.
+   */
+  @SuppressWarnings("unchecked")
+  // No values from static EMPTY class
+  public void accept(CompositeRecordReader.JoinCollector jc, K key)
+      throws IOException {
+    if (hasNext() && 0 == cmp.compare(key, key())) {
+      fillJoinCollector(createKey());
+      jc.add(id, getDelegate());
+      return;
+    }
+    jc.add(id, EMPTY);
+  }
+
+  /**
+   * For all child RRs offering the key provided, obtain an iterator at that
+   * position in the JoinCollector.
+   */
+  protected void fillJoinCollector(K iterkey) throws IOException {
+    if (!q.isEmpty()) {
+      q.peek().key(iterkey);
+      while (0 == cmp.compare(q.peek().key(), iterkey)) {
+        ComposableRecordReader<K, ?> t = q.poll();
+        t.accept(jc, iterkey);
+        if (t.hasNext()) {
+          q.add(t);
+        } else if (q.isEmpty()) {
+          return;
+        }
+      }
+    }
+  }
+
+  /**
+   * Implement Comparable contract (compare key of join or head of heap with
+   * that of another).
+   */
+  public int compareTo(ComposableRecordReader<K, ?> other) {
+    return cmp.compare(key(), other.key());
+  }
+
+  /**
+   * Create a new key value common to all child RRs.
+   * 
+   * @throws ClassCastException if key classes differ.
+   */
+  @SuppressWarnings("unchecked")
+  // Explicit check for key class agreement
+  public K createKey() {
+    if (null == keyclass) {
+      final Class<?> cls = kids[0].createKey().getClass();
+      for (RecordReader<K, ? extends Writable> rr : kids) {
+        if (!cls.equals(rr.createKey().getClass())) {
+          throw new ClassCastException("Child key classes fail to agree");
+        }
+      }
+      keyclass = cls.asSubclass(WritableComparable.class);
+    }
+    return (K) ReflectionUtils.newInstance(keyclass, getConf());
+  }
+
+  /**
+   * Create a value to be used internally for joins.
+   */
+  protected TupleWritable createInternalValue() {
+    Writable[] vals = new Writable[kids.length];
+    for (int i = 0; i < vals.length; ++i) {
+      vals[i] = kids[i].createValue();
+    }
+    return new TupleWritable(vals);
+  }
+
+  /**
+   * Unsupported (returns zero in all cases).
+   */
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  /**
+   * Close all child RRs.
+   */
+  public void close() throws IOException {
+    if (kids != null) {
+      for (RecordReader<K, ? extends Writable> rr : kids) {
+        rr.close();
+      }
+    }
+    if (jc != null) {
+      jc.close();
+    }
+  }
+
+  /**
+   * Report progress as the minimum of all child RR progress.
+   */
+  public float getProgress() throws IOException {
+    float ret = 1.0f;
+    for (RecordReader<K, ? extends Writable> rr : kids) {
+      ret = Math.min(ret, rr.getProgress());
+    }
+    return ret;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Full inner join.
+ */
+public class InnerJoinRecordReader<K extends WritableComparable> extends
+    JoinRecordReader<K> {
+
+  InnerJoinRecordReader(int id, BSPJob job, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, job, capacity, cmpcl);
+  }
+
+  /**
+   * Return true iff the tuple is full (all data sources contain this key).
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    assert srcs.length == dst.size();
+    for (int i = 0; i < srcs.length; ++i) {
+      if (!dst.has(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Base class for Composite joins returning Tuples of arbitrary Writables.
+ */
+public abstract class JoinRecordReader<K extends WritableComparable> extends
+    CompositeRecordReader<K, Writable, TupleWritable> implements
+    ComposableRecordReader<K, TupleWritable> {
+
+  public JoinRecordReader(int id, BSPJob job, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, capacity, cmpcl);
+    setConf(job.getConfiguration());
+  }
+
+  /**
+   * Emit the next set of key, value pairs as defined by the child RecordReaders
+   * and operation associated with this composite RR.
+   */
+  public boolean next(K key, TupleWritable value) throws IOException {
+    if (jc.flush(value)) {
+      WritableUtils.cloneInto(key, jc.key());
+      return true;
+    }
+    jc.clear();
+    K iterkey = createKey();
+    final PriorityQueue<ComposableRecordReader<K, ?>> q = getRecordReaderQueue();
+    while (!q.isEmpty()) {
+      fillJoinCollector(iterkey);
+      jc.reset(iterkey);
+      if (jc.flush(value)) {
+        WritableUtils.cloneInto(key, jc.key());
+        return true;
+      }
+      jc.clear();
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public TupleWritable createValue() {
+    return createInternalValue();
+  }
+
+  /**
+   * Return an iterator wrapping the JoinCollector.
+   */
+  protected ResetableIterator<TupleWritable> getDelegate() {
+    return new JoinDelegationIterator();
+  }
+
+  /**
+   * Since the JoinCollector is effecting our operation, we need only provide an
+   * iterator proxy wrapping its operation.
+   */
+  protected class JoinDelegationIterator implements
+      ResetableIterator<TupleWritable> {
+
+    public boolean hasNext() {
+      return jc.hasNext();
+    }
+
+    public boolean next(TupleWritable val) throws IOException {
+      return jc.flush(val);
+    }
+
+    public boolean replay(TupleWritable val) throws IOException {
+      return jc.replay(val);
+    }
+
+    public void reset() {
+      jc.reset(jc.key());
+    }
+
+    public void add(TupleWritable item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {
+      jc.close();
+    }
+
+    public void clear() {
+      jc.clear();
+    }
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Base class for Composite join returning values derived from multiple sources,
+ * but generally not tuples.
+ */
+public abstract class MultiFilterRecordReader<K extends WritableComparable, V extends Writable>
+    extends CompositeRecordReader<K, V, V> implements
+    ComposableRecordReader<K, V> {
+
+  private Class<? extends Writable> valueclass;
+  private TupleWritable ivalue;
+
+  public MultiFilterRecordReader(int id, BSPJob job, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, capacity, cmpcl);
+    setConf(job.getConfiguration());
+  }
+
+  /**
+   * For each tuple emitted, return a value (typically one of the values in the
+   * tuple). Modifying the Writables in the tuple is permitted and unlikely to
+   * affect join behavior in most cases, but it is not recommended. It's safer
+   * to clone first.
+   */
+  protected abstract V emit(TupleWritable dst) throws IOException;
+
+  /**
+   * Default implementation offers {@link #emit} every Tuple from the collector
+   * (the outer join of child RRs).
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  public boolean next(K key, V value) throws IOException {
+    if (jc.flush(ivalue)) {
+      WritableUtils.cloneInto(key, jc.key());
+      WritableUtils.cloneInto(value, emit(ivalue));
+      return true;
+    }
+    jc.clear();
+    K iterkey = createKey();
+    final PriorityQueue<ComposableRecordReader<K, ?>> q = getRecordReaderQueue();
+    while (!q.isEmpty()) {
+      fillJoinCollector(iterkey);
+      jc.reset(iterkey);
+      if (jc.flush(ivalue)) {
+        WritableUtils.cloneInto(key, jc.key());
+        WritableUtils.cloneInto(value, emit(ivalue));
+        return true;
+      }
+      jc.clear();
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+  // Explicit check for value class agreement
+  public V createValue() {
+    if (null == valueclass) {
+      final Class<?> cls = kids[0].createValue().getClass();
+      for (RecordReader<K, ? extends V> rr : kids) {
+        if (!cls.equals(rr.createValue().getClass())) {
+          throw new ClassCastException("Child value classes fail to agree");
+        }
+      }
+      valueclass = cls.asSubclass(Writable.class);
+      ivalue = createInternalValue();
+    }
+    return (V) ReflectionUtils.newInstance(valueclass, null);
+  }
+
+  /**
+   * Return an iterator returning a single value from the tuple.
+   * 
+   * @see MultiFilterDelegationIterator
+   */
+  protected ResetableIterator<V> getDelegate() {
+    return new MultiFilterDelegationIterator();
+  }
+
+  /**
+   * Proxy the JoinCollector, but include callback to emit.
+   */
+  protected class MultiFilterDelegationIterator implements ResetableIterator<V> {
+
+    public boolean hasNext() {
+      return jc.hasNext();
+    }
+
+    public boolean next(V val) throws IOException {
+      boolean ret;
+      if (ret = jc.flush(ivalue)) {
+        WritableUtils.cloneInto(val, emit(ivalue));
+      }
+      return ret;
+    }
+
+    public boolean replay(V val) throws IOException {
+      WritableUtils.cloneInto(val, emit(ivalue));
+      return true;
+    }
+
+    public void reset() {
+      jc.reset(jc.key());
+    }
+
+    public void add(V item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {
+      jc.close();
+    }
+
+    public void clear() {
+      jc.clear();
+    }
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Full outer join.
+ */
+public class OuterJoinRecordReader<K extends WritableComparable> extends
+    JoinRecordReader<K> {
+
+  OuterJoinRecordReader(int id, BSPJob job, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, job, capacity, cmpcl);
+  }
+
+  /**
+   * Emit everything from the collector.
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    assert srcs.length == dst.size();
+    return true;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Prefer the &quot;rightmost&quot; data source for this key. For example,
+ * <tt>override(S1,S2,S3)</tt> will prefer values from S3 over S2, and values
+ * from S2 over S1 for all keys emitted from all sources.
+ */
+public class OverrideRecordReader<K extends WritableComparable, V extends Writable>
+    extends MultiFilterRecordReader<K, V> {
+
+  OverrideRecordReader(int id, BSPJob job, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, job, capacity, cmpcl);
+  }
+
+  /**
+   * Emit the value with the highest position in the tuple.
+   */
+  @SuppressWarnings("unchecked")
+  // No static typeinfo on Tuples
+  protected V emit(TupleWritable dst) {
+    return (V) dst.iterator().next();
+  }
+
+  /**
+   * Instead of filling the JoinCollector with iterators from all data sources,
+   * fill only the rightmost for this key. This not only saves space by
+   * discarding the other sources, but it also emits the number of key-value
+   * pairs in the preferred RecordReader instead of repeating that stream n
+   * times, where n is the cardinality of the cross product of the discarded
+   * streams for the given key.
+   */
+  protected void fillJoinCollector(K iterkey) throws IOException {
+    final PriorityQueue<ComposableRecordReader<K, ?>> q = getRecordReaderQueue();
+    if (!q.isEmpty()) {
+      int highpos = -1;
+      ArrayList<ComposableRecordReader<K, ?>> list = new ArrayList<ComposableRecordReader<K, ?>>(
+          kids.length);
+      q.peek().key(iterkey);
+      final WritableComparator cmp = getComparator();
+      while (0 == cmp.compare(q.peek().key(), iterkey)) {
+        ComposableRecordReader<K, ?> t = q.poll();
+        if (-1 == highpos || list.get(highpos).id() < t.id()) {
+          highpos = list.size();
+        }
+        list.add(t);
+        if (q.isEmpty())
+          break;
+      }
+      ComposableRecordReader<K, ?> t = list.remove(highpos);
+      t.accept(jc, iterkey);
+      for (ComposableRecordReader<K, ?> rr : list) {
+        rr.skip(iterkey);
+      }
+      list.add(t);
+      for (ComposableRecordReader<K, ?> rr : list) {
+        if (rr.hasNext()) {
+          q.add(rr);
+        }
+      }
+    }
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.CharArrayReader;
+import java.io.IOException;
+import java.io.StreamTokenizer;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.FileInputFormat;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Very simple shift-reduce parser for join expressions.
+ * 
+ * This should be sufficient for the user extension permitted now, but ought to
+ * be replaced with a parser generator if more complex grammars are supported.
+ * In particular, this &quot;shift-reduce&quot; parser has no states. Each set
+ * of formals requires a different internal node type, which is responsible for
+ * interpreting the list of tokens it receives. This is sufficient for the
+ * current grammar, but it has several annoying properties that might inhibit
+ * extension. In particular, parenthesis are always function calls; an algebraic
+ * or filter grammar would not only require a node type, but must also work
+ * around the internals of this parser.
+ * 
+ * For most other cases, adding classes to the hierarchy- particularly by
+ * extending JoinRecordReader and MultiFilterRecordReader- is fairly
+ * straightforward. One need only override the relevant method(s) (usually only
+ * {@link CompositeRecordReader#combine}) and include a property to map its
+ * value to an identifier in the parser.
+ */
+public class Parser {
+  public enum TType {
+    CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM,
+  }
+
+  /**
+   * Tagged-union type for tokens from the join expression.
+   * 
+   * @see Parser.TType
+   */
+  public static class Token {
+
+    private TType type;
+
+    Token(TType type) {
+      this.type = type;
+    }
+
+    public TType getType() {
+      return type;
+    }
+
+    public Node getNode() throws IOException {
+      throw new IOException("Expected nodetype");
+    }
+
+    public double getNum() throws IOException {
+      throw new IOException("Expected numtype");
+    }
+
+    public String getStr() throws IOException {
+      throw new IOException("Expected strtype");
+    }
+  }
+
+  public static class NumToken extends Token {
+    private double num;
+
+    public NumToken(double num) {
+      super(TType.NUM);
+      this.num = num;
+    }
+
+    public double getNum() {
+      return num;
+    }
+  }
+
+  public static class NodeToken extends Token {
+    private Node node;
+
+    NodeToken(Node node) {
+      super(TType.CIF);
+      this.node = node;
+    }
+
+    public Node getNode() {
+      return node;
+    }
+  }
+
+  public static class StrToken extends Token {
+    private String str;
+
+    public StrToken(TType type, String str) {
+      super(type);
+      this.str = str;
+    }
+
+    public String getStr() {
+      return str;
+    }
+  }
+
+  /**
+   * Simple lexer wrapping a StreamTokenizer. This encapsulates the creation of
+   * tagged-union Tokens and initializes the SteamTokenizer.
+   */
+  private static class Lexer {
+
+    private StreamTokenizer tok;
+
+    Lexer(String s) {
+      tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
+      tok.quoteChar('"');
+      tok.parseNumbers();
+      tok.ordinaryChar(',');
+      tok.ordinaryChar('(');
+      tok.ordinaryChar(')');
+      tok.wordChars('$', '$');
+      tok.wordChars('_', '_');
+    }
+
+    Token next() throws IOException {
+      int type = tok.nextToken();
+      switch (type) {
+        case StreamTokenizer.TT_EOF:
+        case StreamTokenizer.TT_EOL:
+          return null;
+        case StreamTokenizer.TT_NUMBER:
+          return new NumToken(tok.nval);
+        case StreamTokenizer.TT_WORD:
+          return new StrToken(TType.IDENT, tok.sval);
+        case '"':
+          return new StrToken(TType.QUOT, tok.sval);
+        default:
+          switch (type) {
+            case ',':
+              return new Token(TType.COMMA);
+            case '(':
+              return new Token(TType.LPAREN);
+            case ')':
+              return new Token(TType.RPAREN);
+            default:
+              throw new IOException("Unexpected: " + type);
+          }
+      }
+    }
+  }
+
+  public abstract static class Node implements ComposableInputFormat {
+    /**
+     * Return the node type registered for the particular identifier. By
+     * default, this is a CNode for any composite node and a WNode for
+     * &quot;wrapped&quot; nodes. User nodes will likely be composite nodes.
+     * 
+     * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class,
+     *      java.lang.Class)
+     * @see CompositeInputFormat#setFormat(org.apache.hama.bsp.BSPJob)
+     */
+    static Node forIdent(String ident) throws IOException {
+      try {
+        if (!nodeCstrMap.containsKey(ident)) {
+          throw new IOException("No nodetype for " + ident);
+        }
+        return nodeCstrMap.get(ident).newInstance(ident);
+      } catch (IllegalAccessException e) {
+        throw (IOException) new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException) new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException) new IOException().initCause(e);
+      }
+    }
+
+    private static final Class<?>[] ncstrSig = { String.class };
+    private static final Map<String, Constructor<? extends Node>> nodeCstrMap = new HashMap<String, Constructor<? extends Node>>();
+    protected static final Map<String, Constructor<? extends ComposableRecordReader>> rrCstrMap = new HashMap<String, Constructor<? extends ComposableRecordReader>>();
+
+    /**
+     * For a given identifier, add a mapping to the nodetype for the parse tree
+     * and to the ComposableRecordReader to be created, including the formals
+     * required to invoke the constructor. The nodetype and constructor
+     * signature should be filled in from the child node.
+     */
+    protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
+        Class<? extends Node> nodetype,
+        Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Constructor<? extends Node> ncstr = nodetype
+          .getDeclaredConstructor(ncstrSig);
+      ncstr.setAccessible(true);
+      nodeCstrMap.put(ident, ncstr);
+      Constructor<? extends ComposableRecordReader> mcstr = cl
+          .getDeclaredConstructor(mcstrSig);
+      mcstr.setAccessible(true);
+      rrCstrMap.put(ident, mcstr);
+    }
+
+    // inst
+    protected int id = -1;
+    protected String ident;
+    protected Class<? extends WritableComparator> cmpcl;
+
+    protected Node(String ident) {
+      this.ident = ident;
+    }
+
+    protected void setID(int id) {
+      this.id = id;
+    }
+
+    protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+      this.cmpcl = cmpcl;
+    }
+
+    abstract void parse(List<Token> args, BSPJob job) throws IOException;
+  }
+
+  /**
+   * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
+   */
+  static class WNode extends Node {
+    private static final Class<?>[] cstrSig = { Integer.TYPE,
+        RecordReader.class, Class.class };
+
+    static void addIdentifier(String ident,
+        Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Node.addIdentifier(ident, cstrSig, WNode.class, cl);
+    }
+
+    private String indir;
+    private InputFormat inf;
+
+    public WNode(String ident) {
+      super(ident);
+    }
+
+    /**
+     * Let the first actual define the InputFormat and the second define the
+     * <tt>bsp.input.dir</tt> property.
+     */
+    public void parse(List<Token> ll, BSPJob job) throws IOException {
+      StringBuilder sb = new StringBuilder();
+      Iterator<Token> i = ll.iterator();
+      while (i.hasNext()) {
+        Token t = i.next();
+        if (TType.COMMA.equals(t.getType())) {
+          try {
+            inf = (InputFormat) ReflectionUtils.newInstance(job
+                .getConfiguration().getClassByName(sb.toString()), job
+                .getConfiguration());
+          } catch (ClassNotFoundException e) {
+            throw (IOException) new IOException().initCause(e);
+          } catch (IllegalArgumentException e) {
+            throw (IOException) new IOException().initCause(e);
+          }
+          break;
+        }
+        sb.append(t.getStr());
+      }
+      if (!i.hasNext()) {
+        throw new IOException("Parse error");
+      }
+      Token t = i.next();
+      if (!TType.QUOT.equals(t.getType())) {
+        throw new IOException("Expected quoted string");
+      }
+      indir = t.getStr();
+      // no check for ll.isEmpty() to permit extension
+    }
+
+    private BSPJob getConf(BSPJob job) throws IOException {
+      BSPJob bspJob = new BSPJob((HamaConfiguration) job.getConfiguration());
+      FileInputFormat.setInputPaths(bspJob, indir);
+      return bspJob;
+    }
+
+    public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
+      return inf.getSplits(getConf(job), numSplits);
+    }
+
+    public ComposableRecordReader getRecordReader(InputSplit split, BSPJob job)
+        throws IOException {
+      try {
+        if (!rrCstrMap.containsKey(ident)) {
+          throw new IOException("No RecordReader for " + ident);
+        }
+        return rrCstrMap.get(ident).newInstance(id,
+            inf.getRecordReader(split, getConf(job)), cmpcl);
+      } catch (IllegalAccessException e) {
+        throw (IOException) new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException) new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException) new IOException().initCause(e);
+      }
+    }
+
+    public String toString() {
+      return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
+    }
+  }
+
+  /**
+   * Internal nodetype for &quot;composite&quot; InputFormats.
+   */
+  static class CNode extends Node {
+
+    private static final Class<?>[] cstrSig = { Integer.TYPE, BSPJob.class,
+        Integer.TYPE, Class.class };
+
+    static void addIdentifier(String ident,
+        Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Node.addIdentifier(ident, cstrSig, CNode.class, cl);
+    }
+
+    // inst
+    private ArrayList<Node> kids = new ArrayList<Node>();
+
+    public CNode(String ident) {
+      super(ident);
+    }
+
+    public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+      super.setKeyComparator(cmpcl);
+      for (Node n : kids) {
+        n.setKeyComparator(cmpcl);
+      }
+    }
+
+    /**
+     * Combine InputSplits from child InputFormats into a
+     * {@link CompositeInputSplit}.
+     */
+    public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
+      InputSplit[][] splits = new InputSplit[kids.size()][];
+      for (int i = 0; i < kids.size(); ++i) {
+        final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
+        if (null == tmp) {
+          throw new IOException("Error gathering splits from child RReader");
+        }
+        if (i > 0 && splits[i - 1].length != tmp.length) {
+          throw new IOException("Inconsistent split cardinality from child "
+              + i + " (" + splits[i - 1].length + "/" + tmp.length + ")");
+        }
+        splits[i] = tmp;
+      }
+      final int size = splits[0].length;
+      CompositeInputSplit[] ret = new CompositeInputSplit[size];
+      for (int i = 0; i < size; ++i) {
+        ret[i] = new CompositeInputSplit(splits.length);
+        for (int j = 0; j < splits.length; ++j) {
+          ret[i].add(splits[j][i]);
+        }
+      }
+      return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    // child types unknowable
+    public ComposableRecordReader getRecordReader(InputSplit split, BSPJob job)
+        throws IOException {
+      if (!(split instanceof CompositeInputSplit)) {
+        throw new IOException("Invalid split type:"
+            + split.getClass().getName());
+      }
+      final CompositeInputSplit spl = (CompositeInputSplit) split;
+      final int capacity = kids.size();
+      CompositeRecordReader ret = null;
+      try {
+        if (!rrCstrMap.containsKey(ident)) {
+          throw new IOException("No RecordReader for " + ident);
+        }
+        ret = (CompositeRecordReader) rrCstrMap.get(ident).newInstance(id, job,
+            capacity, cmpcl);
+      } catch (IllegalAccessException e) {
+        throw (IOException) new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException) new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException) new IOException().initCause(e);
+      }
+      for (int i = 0; i < capacity; ++i) {
+        ret.add(kids.get(i).getRecordReader(spl.get(i), job));
+      }
+      return (ComposableRecordReader) ret;
+    }
+
+    /**
+     * Parse a list of comma-separated nodes.
+     */
+    public void parse(List<Token> args, BSPJob job) throws IOException {
+      ListIterator<Token> i = args.listIterator();
+      while (i.hasNext()) {
+        Token t = i.next();
+        t.getNode().setID(i.previousIndex() >> 1);
+        kids.add(t.getNode());
+        if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
+          throw new IOException("Expected ','");
+        }
+      }
+    }
+
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(ident + "(");
+      for (Node n : kids) {
+        sb.append(n.toString() + ",");
+      }
+      sb.setCharAt(sb.length() - 1, ')');
+      return sb.toString();
+    }
+  }
+
+  private static Token reduce(Stack<Token> st, BSPJob job) throws IOException {
+    LinkedList<Token> args = new LinkedList<Token>();
+    while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
+      args.addFirst(st.pop());
+    }
+    if (st.isEmpty()) {
+      throw new IOException("Unmatched ')'");
+    }
+    st.pop();
+    if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
+      throw new IOException("Identifier expected");
+    }
+    Node n = Node.forIdent(st.pop().getStr());
+    n.parse(args, job);
+    return new NodeToken(n);
+  }
+
+  /**
+   * Given an expression and an optional comparator, build a tree of
+   * InputFormats using the comparator to sort keys.
+   */
+  static Node parse(String expr, BSPJob job) throws IOException {
+    if (null == expr) {
+      throw new IOException("Expression is null");
+    }
+    Class<? extends WritableComparator> cmpcl = job.getConfiguration()
+        .getClass("bsp.join.keycomparator", null, WritableComparator.class);
+    Lexer lex = new Lexer(expr);
+    Stack<Token> st = new Stack<Token>();
+    Token tok;
+    while ((tok = lex.next()) != null) {
+      if (TType.RPAREN.equals(tok.getType())) {
+        st.push(reduce(st, job));
+      } else {
+        st.push(tok);
+      }
+    }
+    if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
+      Node ret = st.pop().getNode();
+      if (cmpcl != null) {
+        ret.setKeyComparator(cmpcl);
+      }
+      return ret;
+    }
+    throw new IOException("Missing ')'");
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This defines an interface to a stateful Iterator that can replay elements
+ * added to it directly. Note that this does not extend
+ * {@link java.util.Iterator}.
+ */
+public interface ResetableIterator<T extends Writable> {
+
+  public static class EMPTY<U extends Writable> implements ResetableIterator<U> {
+    public boolean hasNext() {
+      return false;
+    }
+
+    public void reset() {
+    }
+
+    public void close() throws IOException {
+    }
+
+    public void clear() {
+    }
+
+    public boolean next(U val) throws IOException {
+      return false;
+    }
+
+    public boolean replay(U val) throws IOException {
+      return false;
+    }
+
+    public void add(U item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * True if a call to next may return a value. This is permitted false
+   * positives, but not false negatives.
+   */
+  public boolean hasNext();
+
+  /**
+   * Assign next value to actual. It is required that elements added to a
+   * ResetableIterator be returned in the same order after a call to
+   * {@link #reset} (FIFO).
+   * 
+   * Note that a call to this may fail for nested joins (i.e. more elements
+   * available, but none satisfying the constraints of the join)
+   */
+  public boolean next(T val) throws IOException;
+
+  /**
+   * Assign last value returned to actual.
+   */
+  public boolean replay(T val) throws IOException;
+
+  /**
+   * Set iterator to return to the start of its range. Must be called after
+   * calling {@link #add} to avoid a ConcurrentModificationException.
+   */
+  public void reset();
+
+  /**
+   * Add an element to the collection of elements to iterate over.
+   */
+  public void add(T item) throws IOException;
+
+  /**
+   * Close datasources and release resources. Calling methods on the iterator
+   * after calling close has undefined behavior.
+   */
+  // XXX is this necessary?
+  public void close() throws IOException;
+
+  /**
+   * Close datasources, but do not release internal resources. Calling this
+   * method should permit the object to be reused with a different datasource.
+   */
+  public void clear();
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class provides an implementation of ResetableIterator. This
+ * implementation uses a byte array to store elements added to it.
+ */
+public class StreamBackedIterator<X extends Writable> implements
+    ResetableIterator<X> {
+
+  private static class ReplayableByteInputStream extends ByteArrayInputStream {
+    public ReplayableByteInputStream(byte[] arr) {
+      super(arr);
+    }
+
+    public void resetStream() {
+      mark = 0;
+      reset();
+    }
+  }
+
+  private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
+  private DataOutputStream outfbuf = new DataOutputStream(outbuf);
+  private ReplayableByteInputStream inbuf;
+  private DataInputStream infbuf;
+
+  public StreamBackedIterator() {
+  }
+
+  public boolean hasNext() {
+    return infbuf != null && inbuf.available() > 0;
+  }
+
+  public boolean next(X val) throws IOException {
+    if (hasNext()) {
+      inbuf.mark(0);
+      val.readFields(infbuf);
+      return true;
+    }
+    return false;
+  }
+
+  public boolean replay(X val) throws IOException {
+    inbuf.reset();
+    if (0 == inbuf.available())
+      return false;
+    val.readFields(infbuf);
+    return true;
+  }
+
+  public void reset() {
+    if (null != outfbuf) {
+      inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
+      infbuf = new DataInputStream(inbuf);
+      outfbuf = null;
+    }
+    inbuf.resetStream();
+  }
+
+  public void add(X item) throws IOException {
+    item.write(outfbuf);
+  }
+
+  public void close() throws IOException {
+    if (null != infbuf)
+      infbuf.close();
+    if (null != outfbuf)
+      outfbuf.close();
+  }
+
+  public void clear() {
+    if (null != inbuf)
+      inbuf.resetStream();
+    outbuf.reset();
+    outfbuf = new DataOutputStream(outbuf);
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
+ * 
+ * This is *not* a general-purpose tuple type. In almost all cases, users are
+ * encouraged to implement their own serializable types, which can perform
+ * better validation and provide more efficient encodings than this class is
+ * capable. TupleWritable relies on the join framework for type safety and
+ * assumes its instances will rarely be persisted, assumptions not only
+ * incompatible with, but contrary to the general case.
+ * 
+ * @see org.apache.hadoop.io.Writable
+ */
+public class TupleWritable implements Writable, Iterable<Writable> {
+
+  private long written;
+  private Writable[] values;
+
+  /**
+   * Create an empty tuple with no allocated storage for writables.
+   */
+  public TupleWritable() {
+  }
+
+  /**
+   * Initialize tuple with storage; unknown whether any of them contain
+   * &quot;written&quot; values.
+   */
+  public TupleWritable(Writable[] vals) {
+    written = 0L;
+    values = vals;
+  }
+
+  /**
+   * Return true if tuple has an element at the position provided.
+   */
+  public boolean has(int i) {
+    return 0 != ((1L << i) & written);
+  }
+
+  /**
+   * Get ith Writable from Tuple.
+   */
+  public Writable get(int i) {
+    return values[i];
+  }
+
+  /**
+   * The number of children in this Tuple.
+   */
+  public int size() {
+    return values.length;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean equals(Object other) {
+    if (other instanceof TupleWritable) {
+      TupleWritable that = (TupleWritable) other;
+      if (this.size() != that.size() || this.written != that.written) {
+        return false;
+      }
+      for (int i = 0; i < values.length; ++i) {
+        if (!has(i))
+          continue;
+        if (!values[i].equals(that.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return (int) written;
+  }
+
+  /**
+   * Return an iterator over the elements in this tuple. Note that this doesn't
+   * flatten the tuple; one may receive tuples from this iterator.
+   */
+  public Iterator<Writable> iterator() {
+    final TupleWritable t = this;
+    return new Iterator<Writable>() {
+      long i = written;
+      long last = 0L;
+
+      public boolean hasNext() {
+        return 0L != i;
+      }
+
+      public Writable next() {
+        last = Long.lowestOneBit(i);
+        if (0 == last)
+          throw new NoSuchElementException();
+        i ^= last;
+        // numberOfTrailingZeros rtn 64 if lsb set
+        return t.get(Long.numberOfTrailingZeros(last) % 64);
+      }
+
+      public void remove() {
+        t.written ^= last;
+        if (t.has(Long.numberOfTrailingZeros(last))) {
+          throw new IllegalStateException("Attempt to remove non-existent val");
+        }
+      }
+    };
+  }
+
+  /**
+   * Convert Tuple to String as in the following.
+   * <tt>[<child1>,<child2>,...,<childn>]</tt>
+   */
+  public String toString() {
+    StringBuffer buf = new StringBuffer("[");
+    for (int i = 0; i < values.length; ++i) {
+      buf.append(has(i) ? values[i].toString() : "");
+      buf.append(",");
+    }
+    if (values.length != 0)
+      buf.setCharAt(buf.length() - 1, ']');
+    else
+      buf.append(']');
+    return buf.toString();
+  }
+
+  // Writable
+
+  /**
+   * Writes each Writable to <code>out</code>. TupleWritable format:
+   * {@code
+   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, values.length);
+    WritableUtils.writeVLong(out, written);
+    for (int i = 0; i < values.length; ++i) {
+      Text.writeString(out, values[i].getClass().getName());
+    }
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i)) {
+        values[i].write(out);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings("unchecked")
+  // No static typeinfo on Tuples
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    values = new Writable[card];
+    written = WritableUtils.readVLong(in);
+    Class<? extends Writable>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
+      }
+      for (int i = 0; i < card; ++i) {
+        values[i] = cls[i].newInstance();
+        if (has(i)) {
+          values[i].readFields(in);
+        }
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    } catch (InstantiationException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    }
+  }
+
+  /**
+   * Record that the tuple contains an element at the position provided.
+   */
+  void setWritten(int i) {
+    written |= 1L << i;
+  }
+
+  /**
+   * Record that the tuple does not contain an element at the position provided.
+   */
+  void clearWritten(int i) {
+    written &= -1 ^ (1L << i);
+  }
+
+  /**
+   * Clear any record of which writables have been written to, without releasing
+   * storage.
+   */
+  void clearWritten() {
+    written = 0L;
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java Mon Dec  2 10:41:32 2013
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Proxy class for a RecordReader participating in the join framework. This
+ * class keeps track of the &quot;head&quot; key-value pair for the provided
+ * RecordReader and keeps a store of values matching a key when this source is
+ * participating in a join.
+ */
+public class WrappedRecordReader<K extends WritableComparable, U extends Writable>
+    implements ComposableRecordReader<K, U> {
+
+  private boolean empty = false;
+  private RecordReader<K, U> rr;
+  private int id; // index at which values will be inserted in collector
+
+  private K khead; // key at the top of this RR
+  private U vhead; // value assoc with khead
+  private WritableComparator cmp;
+
+  private ResetableIterator<U> vjoin;
+
+  /**
+   * For a given RecordReader rr, occupy position id in collector.
+   */
+  WrappedRecordReader(int id, RecordReader<K, U> rr,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    this.id = id;
+    this.rr = rr;
+    khead = rr.createKey();
+    vhead = rr.createValue();
+    try {
+      cmp = (null == cmpcl) ? WritableComparator.get(khead.getClass()) : cmpcl
+          .newInstance();
+    } catch (InstantiationException e) {
+      throw (IOException) new IOException().initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException) new IOException().initCause(e);
+    }
+    vjoin = new StreamBackedIterator<U>();
+    next();
+  }
+
+  /** {@inheritDoc} */
+  public int id() {
+    return id;
+  }
+
+  /**
+   * Return the key at the head of this RR.
+   */
+  public K key() {
+    return khead;
+  }
+
+  /**
+   * Clone the key at the head of this RR into the object supplied.
+   */
+  public void key(K qkey) throws IOException {
+    WritableUtils.cloneInto(qkey, khead);
+  }
+
+  /**
+   * Return true if the RR- including the k,v pair stored in this object- is
+   * exhausted.
+   */
+  public boolean hasNext() {
+    return !empty;
+  }
+
+  /**
+   * Skip key-value pairs with keys less than or equal to the key provided.
+   */
+  public void skip(K key) throws IOException {
+    if (hasNext()) {
+      while (cmp.compare(khead, key) <= 0 && next())
+        ;
+    }
+  }
+
+  /**
+   * Read the next k,v pair into the head of this object; return true iff the RR
+   * and this are exhausted.
+   */
+  protected boolean next() throws IOException {
+    empty = !rr.next(khead, vhead);
+    return hasNext();
+  }
+
+  /**
+   * Add an iterator to the collector at the position occupied by this
+   * RecordReader over the values in this stream paired with the key provided
+   * (ie register a stream of values from this source matching K with a
+   * collector).
+   */
+  // JoinCollector comes from parent, which has
+  @SuppressWarnings("unchecked")
+  // no static type for the slot this sits in
+  public void accept(CompositeRecordReader.JoinCollector i, K key)
+      throws IOException {
+    vjoin.clear();
+    if (0 == cmp.compare(key, khead)) {
+      do {
+        vjoin.add(vhead);
+      } while (next() && 0 == cmp.compare(key, khead));
+    }
+    i.add(id, vjoin);
+  }
+
+  /**
+   * Write key-value pair at the head of this stream to the objects provided;
+   * get next key-value pair from proxied RR.
+   */
+  public boolean next(K key, U value) throws IOException {
+    if (hasNext()) {
+      WritableUtils.cloneInto(key, khead);
+      WritableUtils.cloneInto(value, vhead);
+      next();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Request new key from proxied RR.
+   */
+  public K createKey() {
+    return rr.createKey();
+  }
+
+  /**
+   * Request new value from proxied RR.
+   */
+  public U createValue() {
+    return rr.createValue();
+  }
+
+  /**
+   * Request progress from proxied RR.
+   */
+  public float getProgress() throws IOException {
+    return rr.getProgress();
+  }
+
+  /**
+   * Request position from proxied RR.
+   */
+  public long getPos() throws IOException {
+    return rr.getPos();
+  }
+
+  /**
+   * Forward close request to proxied RR.
+   */
+  public void close() throws IOException {
+    rr.close();
+  }
+
+  /**
+   * Implement Comparable contract (compare key at head of proxied RR with that
+   * of another).
+   */
+  public int compareTo(ComposableRecordReader<K, ?> other) {
+    return cmp.compare(key(), other.key());
+  }
+
+  /**
+   * Return true iff compareTo(other) retn true.
+   */
+  @SuppressWarnings("unchecked")
+  // Explicit type check prior to cast
+  public boolean equals(Object other) {
+    return other instanceof ComposableRecordReader
+        && 0 == compareTo((ComposableRecordReader) other);
+  }
+
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return 42;
+  }
+
+}