You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/12/02 11:41:33 UTC
svn commit: r1546951 [1/2] - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/join/
core/src/test/java/org/apache/hama/bsp/
Author: millecker
Date: Mon Dec 2 10:41:32 2013
New Revision: 1546951
URL: http://svn.apache.org/r1546951
Log:
HAMA-774: CompositeInputFormat in Hama
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCompositeInputFormat.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1546951&r1=1546950&r2=1546951&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Dec 2 10:41:32 2013
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
NEW FEATURES
+ HAMA-774: CompositeInputFormat in Hama (Martin Illecker)
HAMA-815: Hama Pipes uses C++ templates (Martin Illecker)
BUG FIXES
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1546951&r1=1546950&r2=1546951&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Dec 2 10:41:32 2013
@@ -324,7 +324,8 @@ public class BSPJobClient extends Config
short replication = (short) job.getInt("bsp.submit.replication", 10);
// only create the splits if we have an input
- if (job.get("bsp.input.dir") != null) {
+ if ((job.get("bsp.input.dir") != null)
+ || (job.get("bsp.join.expr") != null)) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableInputFormat.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+
+/**
+ * Refinement of InputFormat requiring implementors to provide
+ * ComposableRecordReader instead of RecordReader.
+ */
+public interface ComposableInputFormat<K extends WritableComparable, V extends Writable>
+ extends InputFormat<K, V> {
+
+ ComposableRecordReader<K, V> getRecordReader(InputSplit split, BSPJob job)
+ throws IOException;
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ComposableRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Additional operations required of a RecordReader to participate in a join.
+ */
+public interface ComposableRecordReader<K extends WritableComparable, V extends Writable>
+ extends RecordReader<K, V>, Comparable<ComposableRecordReader<K, ?>> {
+
+ /**
+ * Return the position in the collector this class occupies.
+ */
+ int id();
+
+ /**
+ * Return the key this RecordReader would supply on a call to next(K,V)
+ */
+ K key();
+
+ /**
+ * Clone the key at the head of this RecordReader into the object provided.
+ */
+ void key(K key) throws IOException;
+
+ /**
+ * Returns true if the stream is not empty, but provides no guarantee that a
+ * call to next(K,V) will succeed.
+ */
+ boolean hasNext();
+
+ /**
+ * Skip key-value pairs with keys less than or equal to the key provided.
+ */
+ void skip(K key) throws IOException;
+
+ /**
+ * While key-value pairs from this RecordReader match the given key, register
+ * them with the JoinCollector provided.
+ */
+ void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException;
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputFormat.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+
+public class CompositeInputFormat<K extends WritableComparable> implements
+ ComposableInputFormat<K, TupleWritable> {
+
+ // expression parse tree to which IF requests are proxied
+ private Parser.Node root;
+
+ public CompositeInputFormat() {
+ }
+
+ /**
+ * Interpret a given string as a composite expression.
+ * {@code
+ * func ::= <ident>([<func>,]*<func>)
+ * func ::= tbl(<class>,"<path>")
+ * class ::= @see java.lang.Class#forName(java.lang.String)
+ * path ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
+ * } Reads expression from the <tt>mapred.join.expr</tt> property and
+ * user-supplied join types from <tt>mapred.join.define.<ident></tt>
+ * types. Paths supplied to <tt>tbl</tt> are given as input paths to the
+ * InputFormat class listed.
+ *
+ * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
+ */
+ public void setFormat(BSPJob job) throws IOException {
+ addDefaults();
+ addUserIdentifiers(job);
+ root = Parser.parse(job.get("bsp.join.expr"), job);
+ }
+
+ /**
+ * Adds the default set of identifiers to the parser.
+ */
+ protected void addDefaults() {
+ try {
+ Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
+ Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
+ Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
+ Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("FATAL: Failed to init defaults", e);
+ }
+ }
+
+ /**
+ * Inform the parser of user-defined types.
+ */
+ private void addUserIdentifiers(BSPJob job) throws IOException {
+ Pattern x = Pattern.compile("^bsp\\.join\\.define\\.(\\w+)$");
+ for (Map.Entry<String, String> kv : job.getConfiguration()) {
+ Matcher m = x.matcher(kv.getKey());
+ if (m.matches()) {
+ try {
+ Parser.CNode.addIdentifier(m.group(1), job.getConfiguration()
+ .getClass(m.group(0), null, ComposableRecordReader.class));
+ } catch (NoSuchMethodException e) {
+ throw (IOException) new IOException("Invalid define for "
+ + m.group(1)).initCause(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Build a CompositeInputSplit from the child InputFormats by assigning the
+ * ith split from each child to the ith composite split.
+ */
+ public InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException {
+ setFormat(job);
+ job.getConfiguration().setLong("bsp.min.split.size", Long.MAX_VALUE);
+ return root.getSplits(job, numBspTask);
+ }
+
+ /**
+ * Construct a CompositeRecordReader for the children of this InputFormat as
+ * defined in the init expression. The outermost join need only be composable,
+ * not necessarily a composite. Mandating TupleWritable isn't strictly
+ * correct.
+ */
+ @SuppressWarnings("unchecked")
+ // child types unknown
+ public ComposableRecordReader<K, TupleWritable> getRecordReader(
+ InputSplit split, BSPJob job) throws IOException {
+ setFormat(job);
+ return root.getRecordReader(split, job);
+ }
+
+ /**
+ * Convenience method for constructing composite formats. Given InputFormat
+ * class (inf), path (p) return: {@code tbl(<inf>,
+ * <p>) }
+ */
+ public static String compose(Class<? extends InputFormat> inf, String path) {
+ return compose(inf.getName().intern(), path, new StringBuffer()).toString();
+ }
+
+ /**
+ * Convenience method for constructing composite formats. Given operation
+ * (op), Object class (inf), set of paths (p) return:
+ * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+ */
+ public static String compose(String op, Class<? extends InputFormat> inf,
+ String... path) {
+ final String infname = inf.getName();
+ StringBuffer ret = new StringBuffer(op + '(');
+ for (String p : path) {
+ compose(infname, p, ret);
+ ret.append(',');
+ }
+ ret.setCharAt(ret.length() - 1, ')');
+ return ret.toString();
+ }
+
+ /**
+ * Convenience method for constructing composite formats. Given operation
+ * (op), Object class (inf), set of paths (p) return:
+ * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+ */
+ public static String compose(String op, Class<? extends InputFormat> inf,
+ Path... path) {
+ ArrayList<String> tmp = new ArrayList<String>(path.length);
+ for (Path p : path) {
+ tmp.add(p.toString());
+ }
+ return compose(op, inf, tmp.toArray(new String[0]));
+ }
+
+ private static StringBuffer compose(String inf, String path, StringBuffer sb) {
+ sb.append("tbl(" + inf + ",\"");
+ sb.append(path);
+ sb.append("\")");
+ return sb;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeInputSplit.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.InputSplit;
+
+/**
+ * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
+ * into this collection must have a public default constructor.
+ */
+public class CompositeInputSplit implements InputSplit {
+
+ private int fill = 0;
+ private long totsize = 0L;
+ private InputSplit[] splits;
+
+ public CompositeInputSplit() {
+ }
+
+ public CompositeInputSplit(int capacity) {
+ splits = new InputSplit[capacity];
+ }
+
+ /**
+ * Add an InputSplit to this collection.
+ *
+ * @throws IOException If capacity was not specified during construction or if
+ * capacity has been reached.
+ */
+ public void add(InputSplit s) throws IOException {
+ if (null == splits) {
+ throw new IOException("Uninitialized InputSplit");
+ }
+ if (fill == splits.length) {
+ throw new IOException("Too many splits");
+ }
+ splits[fill++] = s;
+ totsize += s.getLength();
+ }
+
+ /**
+ * Get ith child InputSplit.
+ */
+ public InputSplit get(int i) {
+ return splits[i];
+ }
+
+ /**
+ * Return the aggregate length of all child InputSplits currently added.
+ */
+ public long getLength() throws IOException {
+ return totsize;
+ }
+
+ /**
+ * Get the length of ith child InputSplit.
+ */
+ public long getLength(int i) throws IOException {
+ return splits[i].getLength();
+ }
+
+ /**
+ * Collect a set of hosts from all child InputSplits.
+ */
+ public String[] getLocations() throws IOException {
+ HashSet<String> hosts = new HashSet<String>();
+ for (InputSplit s : splits) {
+ String[] hints = s.getLocations();
+ if (hints != null && hints.length > 0) {
+ for (String host : hints) {
+ hosts.add(host);
+ }
+ }
+ }
+ return hosts.toArray(new String[hosts.size()]);
+ }
+
+ /**
+ * getLocations from ith InputSplit.
+ */
+ public String[] getLocation(int i) throws IOException {
+ return splits[i].getLocations();
+ }
+
+ /**
+ * Write splits in the following format.
+ * {@code
+ * <count><class1><class2>...<classn><split1><split2>...<splitn>
+ * }
+ */
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, splits.length);
+ for (InputSplit s : splits) {
+ Text.writeString(out, s.getClass().getName());
+ }
+ for (InputSplit s : splits) {
+ s.write(out);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IOException If the child InputSplit cannot be read, typically for
+ * faliing access checks.
+ */
+ @SuppressWarnings("unchecked")
+ // Generic array assignment
+ public void readFields(DataInput in) throws IOException {
+ int card = WritableUtils.readVInt(in);
+ if (splits == null || splits.length != card) {
+ splits = new InputSplit[card];
+ }
+ Class<? extends InputSplit>[] cls = new Class[card];
+ try {
+ for (int i = 0; i < card; ++i) {
+ cls[i] = Class.forName(Text.readString(in))
+ .asSubclass(InputSplit.class);
+ }
+ for (int i = 0; i < card; ++i) {
+ splits[i] = ReflectionUtils.newInstance(cls[i], null);
+ splits[i].readFields(in);
+ }
+ } catch (ClassNotFoundException e) {
+ throw (IOException) new IOException("Failed split init").initCause(e);
+ }
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/CompositeRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * A RecordReader that can effect joins of RecordReaders sharing a common key
+ * type and partitioning.
+ */
+public abstract class CompositeRecordReader<K extends WritableComparable, // key
+// type
+V extends Writable, // accepts RecordReader<K,V> as children
+X extends Writable> // emits Writables of this type
+ implements Configurable {
+
+ private int id;
+ private Configuration conf;
+ private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
+
+ private WritableComparator cmp;
+ private Class<? extends WritableComparable> keyclass;
+ private PriorityQueue<ComposableRecordReader<K, ?>> q;
+
+ protected final JoinCollector jc;
+ protected final ComposableRecordReader<K, ? extends V>[] kids;
+
+ protected abstract boolean combine(Object[] srcs, TupleWritable value);
+
+ /**
+ * Create a RecordReader with <tt>capacity</tt> children to position
+ * <tt>id</tt> in the parent reader. The id of a root CompositeRecordReader is
+ * -1 by convention, but relying on this is not recommended.
+ */
+ @SuppressWarnings("unchecked")
+ // Generic array assignment
+ public CompositeRecordReader(int id, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ assert capacity > 0 : "Invalid capacity";
+ this.id = id;
+ if (null != cmpcl) {
+ cmp = ReflectionUtils.newInstance(cmpcl, null);
+ q = new PriorityQueue<ComposableRecordReader<K, ?>>(3,
+ new Comparator<ComposableRecordReader<K, ?>>() {
+ public int compare(ComposableRecordReader<K, ?> o1,
+ ComposableRecordReader<K, ?> o2) {
+ return cmp.compare(o1.key(), o2.key());
+ }
+ });
+ }
+ jc = new JoinCollector(capacity);
+ kids = new ComposableRecordReader[capacity];
+ }
+
+ /**
+ * Return the position in the collector this class occupies.
+ */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Return sorted list of RecordReaders for this composite.
+ */
+ protected PriorityQueue<ComposableRecordReader<K, ?>> getRecordReaderQueue() {
+ return q;
+ }
+
+ /**
+ * Return comparator defining the ordering for RecordReaders in this
+ * composite.
+ */
+ protected WritableComparator getComparator() {
+ return cmp;
+ }
+
+ /**
+ * Add a RecordReader to this collection. The id() of a RecordReader
+ * determines where in the Tuple its entry will appear. Adding RecordReaders
+ * with the same id has undefined behavior.
+ */
+ public void add(ComposableRecordReader<K, ? extends V> rr) throws IOException {
+ kids[rr.id()] = rr;
+ if (null == q) {
+ cmp = WritableComparator.get(rr.createKey().getClass());
+ q = new PriorityQueue<ComposableRecordReader<K, ?>>(3,
+ new Comparator<ComposableRecordReader<K, ?>>() {
+ public int compare(ComposableRecordReader<K, ?> o1,
+ ComposableRecordReader<K, ?> o2) {
+ return cmp.compare(o1.key(), o2.key());
+ }
+ });
+ }
+ if (rr.hasNext()) {
+ q.add(rr);
+ }
+ }
+
+ /**
+ * Collector for join values. This accumulates values for a given key from the
+ * child RecordReaders. If one or more child RR contain duplicate keys, this
+ * will emit the cross product of the associated values until exhausted.
+ */
+ class JoinCollector {
+ private K key;
+ private ResetableIterator<X>[] iters;
+ private int pos = -1;
+ private boolean first = true;
+
+ /**
+ * Construct a collector capable of handling the specified number of
+ * children.
+ */
+ @SuppressWarnings("unchecked")
+ // Generic array assignment
+ public JoinCollector(int card) {
+ iters = new ResetableIterator[card];
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i] = EMPTY;
+ }
+ }
+
+ /**
+ * Register a given iterator at position id.
+ */
+ public void add(int id, ResetableIterator<X> i) throws IOException {
+ iters[id] = i;
+ }
+
+ /**
+ * Return the key associated with this collection.
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Codify the contents of the collector to be iterated over. When this is
+ * called, all RecordReaders registered for this key should have added
+ * ResetableIterators.
+ */
+ public void reset(K key) {
+ this.key = key;
+ first = true;
+ pos = iters.length - 1;
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i].reset();
+ }
+ }
+
+ /**
+ * Clear all state information.
+ */
+ public void clear() {
+ key = null;
+ pos = -1;
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i].clear();
+ iters[i] = EMPTY;
+ }
+ }
+
+ /**
+ * Returns false if exhausted or if reset(K) has not been called.
+ */
+ protected boolean hasNext() {
+ return !(pos < 0);
+ }
+
+ /**
+ * Populate Tuple from iterators. It should be the case that, given
+ * iterators i_1...i_n over values from sources s_1...s_n sharing key k,
+ * repeated calls to next should yield I x I.
+ */
+ @SuppressWarnings("unchecked")
+ // No static typeinfo on Tuples
+ protected boolean next(TupleWritable val) throws IOException {
+ if (first) {
+ int i = -1;
+ for (pos = 0; pos < iters.length; ++pos) {
+ if (iters[pos].hasNext() && iters[pos].next((X) val.get(pos))) {
+ i = pos;
+ val.setWritten(i);
+ }
+ }
+ pos = i;
+ first = false;
+ if (pos < 0) {
+ clear();
+ return false;
+ }
+ return true;
+ }
+ while (0 <= pos
+ && !(iters[pos].hasNext() && iters[pos].next((X) val.get(pos)))) {
+ --pos;
+ }
+ if (pos < 0) {
+ clear();
+ return false;
+ }
+ val.setWritten(pos);
+ for (int i = 0; i < pos; ++i) {
+ if (iters[i].replay((X) val.get(i))) {
+ val.setWritten(i);
+ }
+ }
+ while (pos + 1 < iters.length) {
+ ++pos;
+ iters[pos].reset();
+ if (iters[pos].hasNext() && iters[pos].next((X) val.get(pos))) {
+ val.setWritten(pos);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Replay the last Tuple emitted.
+ */
+ @SuppressWarnings("unchecked")
+ // No static typeinfo on Tuples
+ public boolean replay(TupleWritable val) throws IOException {
+ // The last emitted tuple might have drawn on an empty source;
+ // it can't be cleared prematurely, b/c there may be more duplicate
+ // keys in iterator positions < pos
+ assert !first;
+ boolean ret = false;
+ for (int i = 0; i < iters.length; ++i) {
+ if (iters[i].replay((X) val.get(i))) {
+ val.setWritten(i);
+ ret = true;
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Close all child iterators.
+ */
+ public void close() throws IOException {
+ for (int i = 0; i < iters.length; ++i) {
+ iters[i].close();
+ }
+ }
+
+ /**
+ * Write the next value into key, value as accepted by the operation
+ * associated with this set of RecordReaders.
+ */
+ public boolean flush(TupleWritable value) throws IOException {
+ while (hasNext()) {
+ value.clearWritten();
+ if (next(value) && combine(kids, value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Return the key for the current join or the value at the top of the
+ * RecordReader heap.
+ */
+ public K key() {
+ if (jc.hasNext()) {
+ return jc.key();
+ }
+ if (!q.isEmpty()) {
+ return q.peek().key();
+ }
+ return null;
+ }
+
+ /**
+ * Clone the key at the top of this RR into the given object.
+ */
+ public void key(K key) throws IOException {
+ WritableUtils.cloneInto(key, key());
+ }
+
+ /**
+ * Return true if it is possible that this could emit more values.
+ */
+ public boolean hasNext() {
+ return jc.hasNext() || !q.isEmpty();
+ }
+
+ /**
+ * Pass skip key to child RRs.
+ */
+ public void skip(K key) throws IOException {
+ ArrayList<ComposableRecordReader<K, ?>> tmp = new ArrayList<ComposableRecordReader<K, ?>>();
+ while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
+ tmp.add(q.poll());
+ }
+ for (ComposableRecordReader<K, ?> rr : tmp) {
+ rr.skip(key);
+ if (rr.hasNext()) {
+ q.add(rr);
+ }
+ }
+ }
+
+ /**
+ * Obtain an iterator over the child RRs apropos of the value type ultimately
+ * emitted from this join.
+ */
+ protected abstract ResetableIterator<X> getDelegate();
+
+ /**
+ * If key provided matches that of this Composite, give JoinCollector iterator
+ * over values it may emit.
+ */
+ @SuppressWarnings("unchecked")
+ // No values from static EMPTY class
+ public void accept(CompositeRecordReader.JoinCollector jc, K key)
+ throws IOException {
+ if (hasNext() && 0 == cmp.compare(key, key())) {
+ fillJoinCollector(createKey());
+ jc.add(id, getDelegate());
+ return;
+ }
+ jc.add(id, EMPTY);
+ }
+
+ /**
+ * For all child RRs offering the key provided, obtain an iterator at that
+ * position in the JoinCollector.
+ */
+ protected void fillJoinCollector(K iterkey) throws IOException {
+ if (!q.isEmpty()) {
+ q.peek().key(iterkey);
+ while (0 == cmp.compare(q.peek().key(), iterkey)) {
+ ComposableRecordReader<K, ?> t = q.poll();
+ t.accept(jc, iterkey);
+ if (t.hasNext()) {
+ q.add(t);
+ } else if (q.isEmpty()) {
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Implement Comparable contract (compare key of join or head of heap with
+ * that of another).
+ */
+ public int compareTo(ComposableRecordReader<K, ?> other) {
+ return cmp.compare(key(), other.key());
+ }
+
+ /**
+ * Create a new key value common to all child RRs.
+ *
+ * @throws ClassCastException if key classes differ.
+ */
+ @SuppressWarnings("unchecked")
+ // Explicit check for key class agreement
+ public K createKey() {
+ if (null == keyclass) {
+ final Class<?> cls = kids[0].createKey().getClass();
+ for (RecordReader<K, ? extends Writable> rr : kids) {
+ if (!cls.equals(rr.createKey().getClass())) {
+ throw new ClassCastException("Child key classes fail to agree");
+ }
+ }
+ keyclass = cls.asSubclass(WritableComparable.class);
+ }
+ return (K) ReflectionUtils.newInstance(keyclass, getConf());
+ }
+
+ /**
+ * Create a value to be used internally for joins.
+ */
+ protected TupleWritable createInternalValue() {
+ Writable[] vals = new Writable[kids.length];
+ for (int i = 0; i < vals.length; ++i) {
+ vals[i] = kids[i].createValue();
+ }
+ return new TupleWritable(vals);
+ }
+
+ /**
+ * Unsupported (returns zero in all cases).
+ */
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ /**
+ * Close all child RRs.
+ */
+ public void close() throws IOException {
+ if (kids != null) {
+ for (RecordReader<K, ? extends Writable> rr : kids) {
+ rr.close();
+ }
+ }
+ if (jc != null) {
+ jc.close();
+ }
+ }
+
+ /**
+ * Report progress as the minimum of all child RR progress.
+ */
+ public float getProgress() throws IOException {
+ float ret = 1.0f;
+ for (RecordReader<K, ? extends Writable> rr : kids) {
+ ret = Math.min(ret, rr.getProgress());
+ }
+ return ret;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/InnerJoinRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Full inner join.
+ */
+public class InnerJoinRecordReader<K extends WritableComparable> extends
+ JoinRecordReader<K> {
+
+ InnerJoinRecordReader(int id, BSPJob job, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, job, capacity, cmpcl);
+ }
+
+ /**
+ * Return true iff the tuple is full (all data sources contain this key).
+ */
+ protected boolean combine(Object[] srcs, TupleWritable dst) {
+ assert srcs.length == dst.size();
+ for (int i = 0; i < srcs.length; ++i) {
+ if (!dst.has(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/JoinRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Base class for Composite joins returning Tuples of arbitrary Writables.
+ */
+public abstract class JoinRecordReader<K extends WritableComparable> extends
+ CompositeRecordReader<K, Writable, TupleWritable> implements
+ ComposableRecordReader<K, TupleWritable> {
+
+ public JoinRecordReader(int id, BSPJob job, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, capacity, cmpcl);
+ setConf(job.getConfiguration());
+ }
+
+ /**
+ * Emit the next set of key, value pairs as defined by the child RecordReaders
+ * and operation associated with this composite RR.
+ */
+ public boolean next(K key, TupleWritable value) throws IOException {
+ if (jc.flush(value)) {
+ WritableUtils.cloneInto(key, jc.key());
+ return true;
+ }
+ jc.clear();
+ K iterkey = createKey();
+ final PriorityQueue<ComposableRecordReader<K, ?>> q = getRecordReaderQueue();
+ while (!q.isEmpty()) {
+ fillJoinCollector(iterkey);
+ jc.reset(iterkey);
+ if (jc.flush(value)) {
+ WritableUtils.cloneInto(key, jc.key());
+ return true;
+ }
+ jc.clear();
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ public TupleWritable createValue() {
+ return createInternalValue();
+ }
+
+ /**
+ * Return an iterator wrapping the JoinCollector.
+ */
+ protected ResetableIterator<TupleWritable> getDelegate() {
+ return new JoinDelegationIterator();
+ }
+
+ /**
+ * Since the JoinCollector is effecting our operation, we need only provide an
+ * iterator proxy wrapping its operation.
+ */
+ protected class JoinDelegationIterator implements
+ ResetableIterator<TupleWritable> {
+
+ public boolean hasNext() {
+ return jc.hasNext();
+ }
+
+ public boolean next(TupleWritable val) throws IOException {
+ return jc.flush(val);
+ }
+
+ public boolean replay(TupleWritable val) throws IOException {
+ return jc.replay(val);
+ }
+
+ public void reset() {
+ jc.reset(jc.key());
+ }
+
+ public void add(TupleWritable item) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException {
+ jc.close();
+ }
+
+ public void clear() {
+ jc.clear();
+ }
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/MultiFilterRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Base class for Composite join returning values derived from multiple sources,
+ * but generally not tuples.
+ */
+public abstract class MultiFilterRecordReader<K extends WritableComparable, V extends Writable>
+ extends CompositeRecordReader<K, V, V> implements
+ ComposableRecordReader<K, V> {
+
+ private Class<? extends Writable> valueclass;
+ private TupleWritable ivalue;
+
+ public MultiFilterRecordReader(int id, BSPJob job, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, capacity, cmpcl);
+ setConf(job.getConfiguration());
+ }
+
+ /**
+ * For each tuple emitted, return a value (typically one of the values in the
+ * tuple). Modifying the Writables in the tuple is permitted and unlikely to
+ * affect join behavior in most cases, but it is not recommended. It's safer
+ * to clone first.
+ */
+ protected abstract V emit(TupleWritable dst) throws IOException;
+
+ /**
+ * Default implementation offers {@link #emit} every Tuple from the collector
+ * (the outer join of child RRs).
+ */
+ protected boolean combine(Object[] srcs, TupleWritable dst) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(K key, V value) throws IOException {
+ if (jc.flush(ivalue)) {
+ WritableUtils.cloneInto(key, jc.key());
+ WritableUtils.cloneInto(value, emit(ivalue));
+ return true;
+ }
+ jc.clear();
+ K iterkey = createKey();
+ final PriorityQueue<ComposableRecordReader<K, ?>> q = getRecordReaderQueue();
+ while (!q.isEmpty()) {
+ fillJoinCollector(iterkey);
+ jc.reset(iterkey);
+ if (jc.flush(ivalue)) {
+ WritableUtils.cloneInto(key, jc.key());
+ WritableUtils.cloneInto(value, emit(ivalue));
+ return true;
+ }
+ jc.clear();
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ // Explicit check for value class agreement
+ public V createValue() {
+ if (null == valueclass) {
+ final Class<?> cls = kids[0].createValue().getClass();
+ for (RecordReader<K, ? extends V> rr : kids) {
+ if (!cls.equals(rr.createValue().getClass())) {
+ throw new ClassCastException("Child value classes fail to agree");
+ }
+ }
+ valueclass = cls.asSubclass(Writable.class);
+ ivalue = createInternalValue();
+ }
+ return (V) ReflectionUtils.newInstance(valueclass, null);
+ }
+
+ /**
+ * Return an iterator returning a single value from the tuple.
+ *
+ * @see MultiFilterDelegationIterator
+ */
+ protected ResetableIterator<V> getDelegate() {
+ return new MultiFilterDelegationIterator();
+ }
+
+ /**
+ * Proxy the JoinCollector, but include callback to emit.
+ */
+ protected class MultiFilterDelegationIterator implements ResetableIterator<V> {
+
+ public boolean hasNext() {
+ return jc.hasNext();
+ }
+
+ public boolean next(V val) throws IOException {
+ boolean ret;
+ if (ret = jc.flush(ivalue)) {
+ WritableUtils.cloneInto(val, emit(ivalue));
+ }
+ return ret;
+ }
+
+ public boolean replay(V val) throws IOException {
+ WritableUtils.cloneInto(val, emit(ivalue));
+ return true;
+ }
+
+ public void reset() {
+ jc.reset(jc.key());
+ }
+
+ public void add(V item) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException {
+ jc.close();
+ }
+
+ public void clear() {
+ jc.clear();
+ }
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OuterJoinRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Full outer join.
+ */
+public class OuterJoinRecordReader<K extends WritableComparable> extends
+ JoinRecordReader<K> {
+
+ OuterJoinRecordReader(int id, BSPJob job, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, job, capacity, cmpcl);
+ }
+
+ /**
+ * Emit everything from the collector.
+ */
+ protected boolean combine(Object[] srcs, TupleWritable dst) {
+ assert srcs.length == dst.size();
+ return true;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/OverrideRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * Prefer the "rightmost" data source for this key. For example,
+ * <tt>override(S1,S2,S3)</tt> will prefer values from S3 over S2, and values
+ * from S2 over S1 for all keys emitted from all sources.
+ */
+public class OverrideRecordReader<K extends WritableComparable, V extends Writable>
+ extends MultiFilterRecordReader<K, V> {
+
+ OverrideRecordReader(int id, BSPJob job, int capacity,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ super(id, job, capacity, cmpcl);
+ }
+
+ /**
+ * Emit the value with the highest position in the tuple.
+ */
+ @SuppressWarnings("unchecked")
+ // No static typeinfo on Tuples
+ protected V emit(TupleWritable dst) {
+ return (V) dst.iterator().next();
+ }
+
+ /**
+ * Instead of filling the JoinCollector with iterators from all data sources,
+ * fill only the rightmost for this key. This not only saves space by
+ * discarding the other sources, but it also emits the number of key-value
+ * pairs in the preferred RecordReader instead of repeating that stream n
+ * times, where n is the cardinality of the cross product of the discarded
+ * streams for the given key.
+ */
+ protected void fillJoinCollector(K iterkey) throws IOException {
+ final PriorityQueue<ComposableRecordReader<K, ?>> q = getRecordReaderQueue();
+ if (!q.isEmpty()) {
+ int highpos = -1;
+ ArrayList<ComposableRecordReader<K, ?>> list = new ArrayList<ComposableRecordReader<K, ?>>(
+ kids.length);
+ q.peek().key(iterkey);
+ final WritableComparator cmp = getComparator();
+ while (0 == cmp.compare(q.peek().key(), iterkey)) {
+ ComposableRecordReader<K, ?> t = q.poll();
+ if (-1 == highpos || list.get(highpos).id() < t.id()) {
+ highpos = list.size();
+ }
+ list.add(t);
+ if (q.isEmpty())
+ break;
+ }
+ ComposableRecordReader<K, ?> t = list.remove(highpos);
+ t.accept(jc, iterkey);
+ for (ComposableRecordReader<K, ?> rr : list) {
+ rr.skip(iterkey);
+ }
+ list.add(t);
+ for (ComposableRecordReader<K, ?> rr : list) {
+ if (rr.hasNext()) {
+ q.add(rr);
+ }
+ }
+ }
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/Parser.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.CharArrayReader;
+import java.io.IOException;
+import java.io.StreamTokenizer;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.FileInputFormat;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Very simple shift-reduce parser for join expressions.
+ *
+ * This should be sufficient for the user extension permitted now, but ought to
+ * be replaced with a parser generator if more complex grammars are supported.
+ * In particular, this "shift-reduce" parser has no states. Each set
+ * of formals requires a different internal node type, which is responsible for
+ * interpreting the list of tokens it receives. This is sufficient for the
+ * current grammar, but it has several annoying properties that might inhibit
+ * extension. In particular, parenthesis are always function calls; an algebraic
+ * or filter grammar would not only require a node type, but must also work
+ * around the internals of this parser.
+ *
+ * For most other cases, adding classes to the hierarchy- particularly by
+ * extending JoinRecordReader and MultiFilterRecordReader- is fairly
+ * straightforward. One need only override the relevant method(s) (usually only
+ * {@link CompositeRecordReader#combine}) and include a property to map its
+ * value to an identifier in the parser.
+ */
+public class Parser {
+ public enum TType {
+ CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM,
+ }
+
+ /**
+ * Tagged-union type for tokens from the join expression.
+ *
+ * @see Parser.TType
+ */
+ public static class Token {
+
+ private TType type;
+
+ Token(TType type) {
+ this.type = type;
+ }
+
+ public TType getType() {
+ return type;
+ }
+
+ public Node getNode() throws IOException {
+ throw new IOException("Expected nodetype");
+ }
+
+ public double getNum() throws IOException {
+ throw new IOException("Expected numtype");
+ }
+
+ public String getStr() throws IOException {
+ throw new IOException("Expected strtype");
+ }
+ }
+
+ public static class NumToken extends Token {
+ private double num;
+
+ public NumToken(double num) {
+ super(TType.NUM);
+ this.num = num;
+ }
+
+ public double getNum() {
+ return num;
+ }
+ }
+
+ public static class NodeToken extends Token {
+ private Node node;
+
+ NodeToken(Node node) {
+ super(TType.CIF);
+ this.node = node;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+ }
+
+ public static class StrToken extends Token {
+ private String str;
+
+ public StrToken(TType type, String str) {
+ super(type);
+ this.str = str;
+ }
+
+ public String getStr() {
+ return str;
+ }
+ }
+
+ /**
+ * Simple lexer wrapping a StreamTokenizer. This encapsulates the creation of
+ * tagged-union Tokens and initializes the SteamTokenizer.
+ */
+ private static class Lexer {
+
+ private StreamTokenizer tok;
+
+ Lexer(String s) {
+ tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
+ tok.quoteChar('"');
+ tok.parseNumbers();
+ tok.ordinaryChar(',');
+ tok.ordinaryChar('(');
+ tok.ordinaryChar(')');
+ tok.wordChars('$', '$');
+ tok.wordChars('_', '_');
+ }
+
+ Token next() throws IOException {
+ int type = tok.nextToken();
+ switch (type) {
+ case StreamTokenizer.TT_EOF:
+ case StreamTokenizer.TT_EOL:
+ return null;
+ case StreamTokenizer.TT_NUMBER:
+ return new NumToken(tok.nval);
+ case StreamTokenizer.TT_WORD:
+ return new StrToken(TType.IDENT, tok.sval);
+ case '"':
+ return new StrToken(TType.QUOT, tok.sval);
+ default:
+ switch (type) {
+ case ',':
+ return new Token(TType.COMMA);
+ case '(':
+ return new Token(TType.LPAREN);
+ case ')':
+ return new Token(TType.RPAREN);
+ default:
+ throw new IOException("Unexpected: " + type);
+ }
+ }
+ }
+ }
+
+ public abstract static class Node implements ComposableInputFormat {
+ /**
+ * Return the node type registered for the particular identifier. By
+ * default, this is a CNode for any composite node and a WNode for
+ * "wrapped" nodes. User nodes will likely be composite nodes.
+ *
+ * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class,
+ * java.lang.Class)
+ * @see CompositeInputFormat#setFormat(org.apache.hama.bsp.BSPJob)
+ */
+ static Node forIdent(String ident) throws IOException {
+ try {
+ if (!nodeCstrMap.containsKey(ident)) {
+ throw new IOException("No nodetype for " + ident);
+ }
+ return nodeCstrMap.get(ident).newInstance(ident);
+ } catch (IllegalAccessException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (InvocationTargetException e) {
+ throw (IOException) new IOException().initCause(e);
+ }
+ }
+
+ private static final Class<?>[] ncstrSig = { String.class };
+ private static final Map<String, Constructor<? extends Node>> nodeCstrMap = new HashMap<String, Constructor<? extends Node>>();
+ protected static final Map<String, Constructor<? extends ComposableRecordReader>> rrCstrMap = new HashMap<String, Constructor<? extends ComposableRecordReader>>();
+
+ /**
+ * For a given identifier, add a mapping to the nodetype for the parse tree
+ * and to the ComposableRecordReader to be created, including the formals
+ * required to invoke the constructor. The nodetype and constructor
+ * signature should be filled in from the child node.
+ */
+ protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
+ Class<? extends Node> nodetype,
+ Class<? extends ComposableRecordReader> cl)
+ throws NoSuchMethodException {
+ Constructor<? extends Node> ncstr = nodetype
+ .getDeclaredConstructor(ncstrSig);
+ ncstr.setAccessible(true);
+ nodeCstrMap.put(ident, ncstr);
+ Constructor<? extends ComposableRecordReader> mcstr = cl
+ .getDeclaredConstructor(mcstrSig);
+ mcstr.setAccessible(true);
+ rrCstrMap.put(ident, mcstr);
+ }
+
+ // inst
+ protected int id = -1;
+ protected String ident;
+ protected Class<? extends WritableComparator> cmpcl;
+
+ protected Node(String ident) {
+ this.ident = ident;
+ }
+
+ protected void setID(int id) {
+ this.id = id;
+ }
+
+ protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+ this.cmpcl = cmpcl;
+ }
+
+ abstract void parse(List<Token> args, BSPJob job) throws IOException;
+ }
+
+ /**
+ * Nodetype in the parse tree for "wrapped" InputFormats.
+ */
+ static class WNode extends Node {
+ private static final Class<?>[] cstrSig = { Integer.TYPE,
+ RecordReader.class, Class.class };
+
+ static void addIdentifier(String ident,
+ Class<? extends ComposableRecordReader> cl)
+ throws NoSuchMethodException {
+ Node.addIdentifier(ident, cstrSig, WNode.class, cl);
+ }
+
+ private String indir;
+ private InputFormat inf;
+
+ public WNode(String ident) {
+ super(ident);
+ }
+
+ /**
+ * Let the first actual define the InputFormat and the second define the
+ * <tt>bsp.input.dir</tt> property.
+ */
+ public void parse(List<Token> ll, BSPJob job) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ Iterator<Token> i = ll.iterator();
+ while (i.hasNext()) {
+ Token t = i.next();
+ if (TType.COMMA.equals(t.getType())) {
+ try {
+ inf = (InputFormat) ReflectionUtils.newInstance(job
+ .getConfiguration().getClassByName(sb.toString()), job
+ .getConfiguration());
+ } catch (ClassNotFoundException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (IllegalArgumentException e) {
+ throw (IOException) new IOException().initCause(e);
+ }
+ break;
+ }
+ sb.append(t.getStr());
+ }
+ if (!i.hasNext()) {
+ throw new IOException("Parse error");
+ }
+ Token t = i.next();
+ if (!TType.QUOT.equals(t.getType())) {
+ throw new IOException("Expected quoted string");
+ }
+ indir = t.getStr();
+ // no check for ll.isEmpty() to permit extension
+ }
+
+ private BSPJob getConf(BSPJob job) throws IOException {
+ BSPJob bspJob = new BSPJob((HamaConfiguration) job.getConfiguration());
+ FileInputFormat.setInputPaths(bspJob, indir);
+ return bspJob;
+ }
+
+ public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
+ return inf.getSplits(getConf(job), numSplits);
+ }
+
+ public ComposableRecordReader getRecordReader(InputSplit split, BSPJob job)
+ throws IOException {
+ try {
+ if (!rrCstrMap.containsKey(ident)) {
+ throw new IOException("No RecordReader for " + ident);
+ }
+ return rrCstrMap.get(ident).newInstance(id,
+ inf.getRecordReader(split, getConf(job)), cmpcl);
+ } catch (IllegalAccessException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (InvocationTargetException e) {
+ throw (IOException) new IOException().initCause(e);
+ }
+ }
+
+ public String toString() {
+ return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
+ }
+ }
+
+ /**
+ * Internal nodetype for "composite" InputFormats.
+ */
+ static class CNode extends Node {
+
+ private static final Class<?>[] cstrSig = { Integer.TYPE, BSPJob.class,
+ Integer.TYPE, Class.class };
+
+ static void addIdentifier(String ident,
+ Class<? extends ComposableRecordReader> cl)
+ throws NoSuchMethodException {
+ Node.addIdentifier(ident, cstrSig, CNode.class, cl);
+ }
+
+ // inst
+ private ArrayList<Node> kids = new ArrayList<Node>();
+
+ public CNode(String ident) {
+ super(ident);
+ }
+
+ public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+ super.setKeyComparator(cmpcl);
+ for (Node n : kids) {
+ n.setKeyComparator(cmpcl);
+ }
+ }
+
+ /**
+ * Combine InputSplits from child InputFormats into a
+ * {@link CompositeInputSplit}.
+ */
+ public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
+ InputSplit[][] splits = new InputSplit[kids.size()][];
+ for (int i = 0; i < kids.size(); ++i) {
+ final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
+ if (null == tmp) {
+ throw new IOException("Error gathering splits from child RReader");
+ }
+ if (i > 0 && splits[i - 1].length != tmp.length) {
+ throw new IOException("Inconsistent split cardinality from child "
+ + i + " (" + splits[i - 1].length + "/" + tmp.length + ")");
+ }
+ splits[i] = tmp;
+ }
+ final int size = splits[0].length;
+ CompositeInputSplit[] ret = new CompositeInputSplit[size];
+ for (int i = 0; i < size; ++i) {
+ ret[i] = new CompositeInputSplit(splits.length);
+ for (int j = 0; j < splits.length; ++j) {
+ ret[i].add(splits[j][i]);
+ }
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("unchecked")
+ // child types unknowable
+ public ComposableRecordReader getRecordReader(InputSplit split, BSPJob job)
+ throws IOException {
+ if (!(split instanceof CompositeInputSplit)) {
+ throw new IOException("Invalid split type:"
+ + split.getClass().getName());
+ }
+ final CompositeInputSplit spl = (CompositeInputSplit) split;
+ final int capacity = kids.size();
+ CompositeRecordReader ret = null;
+ try {
+ if (!rrCstrMap.containsKey(ident)) {
+ throw new IOException("No RecordReader for " + ident);
+ }
+ ret = (CompositeRecordReader) rrCstrMap.get(ident).newInstance(id, job,
+ capacity, cmpcl);
+ } catch (IllegalAccessException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (InvocationTargetException e) {
+ throw (IOException) new IOException().initCause(e);
+ }
+ for (int i = 0; i < capacity; ++i) {
+ ret.add(kids.get(i).getRecordReader(spl.get(i), job));
+ }
+ return (ComposableRecordReader) ret;
+ }
+
+ /**
+ * Parse a list of comma-separated nodes.
+ */
+ public void parse(List<Token> args, BSPJob job) throws IOException {
+ ListIterator<Token> i = args.listIterator();
+ while (i.hasNext()) {
+ Token t = i.next();
+ t.getNode().setID(i.previousIndex() >> 1);
+ kids.add(t.getNode());
+ if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
+ throw new IOException("Expected ','");
+ }
+ }
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ident + "(");
+ for (Node n : kids) {
+ sb.append(n.toString() + ",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');
+ return sb.toString();
+ }
+ }
+
+ private static Token reduce(Stack<Token> st, BSPJob job) throws IOException {
+ LinkedList<Token> args = new LinkedList<Token>();
+ while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
+ args.addFirst(st.pop());
+ }
+ if (st.isEmpty()) {
+ throw new IOException("Unmatched ')'");
+ }
+ st.pop();
+ if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
+ throw new IOException("Identifier expected");
+ }
+ Node n = Node.forIdent(st.pop().getStr());
+ n.parse(args, job);
+ return new NodeToken(n);
+ }
+
+ /**
+ * Given an expression and an optional comparator, build a tree of
+ * InputFormats using the comparator to sort keys.
+ */
+ static Node parse(String expr, BSPJob job) throws IOException {
+ if (null == expr) {
+ throw new IOException("Expression is null");
+ }
+ Class<? extends WritableComparator> cmpcl = job.getConfiguration()
+ .getClass("bsp.join.keycomparator", null, WritableComparator.class);
+ Lexer lex = new Lexer(expr);
+ Stack<Token> st = new Stack<Token>();
+ Token tok;
+ while ((tok = lex.next()) != null) {
+ if (TType.RPAREN.equals(tok.getType())) {
+ st.push(reduce(st, job));
+ } else {
+ st.push(tok);
+ }
+ }
+ if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
+ Node ret = st.pop().getNode();
+ if (cmpcl != null) {
+ ret.setKeyComparator(cmpcl);
+ }
+ return ret;
+ }
+ throw new IOException("Missing ')'");
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/ResetableIterator.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This defines an interface to a stateful Iterator that can replay elements
+ * added to it directly. Note that this does not extend
+ * {@link java.util.Iterator}.
+ */
+public interface ResetableIterator<T extends Writable> {
+
+ public static class EMPTY<U extends Writable> implements ResetableIterator<U> {
+ public boolean hasNext() {
+ return false;
+ }
+
+ public void reset() {
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void clear() {
+ }
+
+ public boolean next(U val) throws IOException {
+ return false;
+ }
+
+ public boolean replay(U val) throws IOException {
+ return false;
+ }
+
+ public void add(U item) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * True if a call to next may return a value. This is permitted false
+ * positives, but not false negatives.
+ */
+ public boolean hasNext();
+
+ /**
+ * Assign next value to actual. It is required that elements added to a
+ * ResetableIterator be returned in the same order after a call to
+ * {@link #reset} (FIFO).
+ *
+ * Note that a call to this may fail for nested joins (i.e. more elements
+ * available, but none satisfying the constraints of the join)
+ */
+ public boolean next(T val) throws IOException;
+
+ /**
+ * Assign last value returned to actual.
+ */
+ public boolean replay(T val) throws IOException;
+
+ /**
+ * Set iterator to return to the start of its range. Must be called after
+ * calling {@link #add} to avoid a ConcurrentModificationException.
+ */
+ public void reset();
+
+ /**
+ * Add an element to the collection of elements to iterate over.
+ */
+ public void add(T item) throws IOException;
+
+ /**
+ * Close datasources and release resources. Calling methods on the iterator
+ * after calling close has undefined behavior.
+ */
+ // XXX is this necessary?
+ public void close() throws IOException;
+
+ /**
+ * Close datasources, but do not release internal resources. Calling this
+ * method should permit the object to be reused with a different datasource.
+ */
+ public void clear();
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/StreamBackedIterator.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class provides an implementation of ResetableIterator. This
+ * implementation uses a byte array to store elements added to it.
+ */
+public class StreamBackedIterator<X extends Writable> implements
+ ResetableIterator<X> {
+
+ private static class ReplayableByteInputStream extends ByteArrayInputStream {
+ public ReplayableByteInputStream(byte[] arr) {
+ super(arr);
+ }
+
+ public void resetStream() {
+ mark = 0;
+ reset();
+ }
+ }
+
+ private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
+ private DataOutputStream outfbuf = new DataOutputStream(outbuf);
+ private ReplayableByteInputStream inbuf;
+ private DataInputStream infbuf;
+
+ public StreamBackedIterator() {
+ }
+
+ public boolean hasNext() {
+ return infbuf != null && inbuf.available() > 0;
+ }
+
+ public boolean next(X val) throws IOException {
+ if (hasNext()) {
+ inbuf.mark(0);
+ val.readFields(infbuf);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean replay(X val) throws IOException {
+ inbuf.reset();
+ if (0 == inbuf.available())
+ return false;
+ val.readFields(infbuf);
+ return true;
+ }
+
+ public void reset() {
+ if (null != outfbuf) {
+ inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
+ infbuf = new DataInputStream(inbuf);
+ outfbuf = null;
+ }
+ inbuf.resetStream();
+ }
+
+ public void add(X item) throws IOException {
+ item.write(outfbuf);
+ }
+
+ public void close() throws IOException {
+ if (null != infbuf)
+ infbuf.close();
+ if (null != outfbuf)
+ outfbuf.close();
+ }
+
+ public void clear() {
+ if (null != inbuf)
+ inbuf.resetStream();
+ outbuf.reset();
+ outfbuf = new DataOutputStream(outbuf);
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/TupleWritable.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
+ *
+ * This is *not* a general-purpose tuple type. In almost all cases, users are
+ * encouraged to implement their own serializable types, which can perform
+ * better validation and provide more efficient encodings than this class is
+ * capable. TupleWritable relies on the join framework for type safety and
+ * assumes its instances will rarely be persisted, assumptions not only
+ * incompatible with, but contrary to the general case.
+ *
+ * @see org.apache.hadoop.io.Writable
+ */
+public class TupleWritable implements Writable, Iterable<Writable> {
+
+ private long written;
+ private Writable[] values;
+
+ /**
+ * Create an empty tuple with no allocated storage for writables.
+ */
+ public TupleWritable() {
+ }
+
+ /**
+ * Initialize tuple with storage; unknown whether any of them contain
+ * "written" values.
+ */
+ public TupleWritable(Writable[] vals) {
+ written = 0L;
+ values = vals;
+ }
+
+ /**
+ * Return true if tuple has an element at the position provided.
+ */
+ public boolean has(int i) {
+ return 0 != ((1L << i) & written);
+ }
+
+ /**
+ * Get ith Writable from Tuple.
+ */
+ public Writable get(int i) {
+ return values[i];
+ }
+
+ /**
+ * The number of children in this Tuple.
+ */
+ public int size() {
+ return values.length;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean equals(Object other) {
+ if (other instanceof TupleWritable) {
+ TupleWritable that = (TupleWritable) other;
+ if (this.size() != that.size() || this.written != that.written) {
+ return false;
+ }
+ for (int i = 0; i < values.length; ++i) {
+ if (!has(i))
+ continue;
+ if (!values[i].equals(that.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return (int) written;
+ }
+
+ /**
+ * Return an iterator over the elements in this tuple. Note that this doesn't
+ * flatten the tuple; one may receive tuples from this iterator.
+ */
+ public Iterator<Writable> iterator() {
+ final TupleWritable t = this;
+ return new Iterator<Writable>() {
+ long i = written;
+ long last = 0L;
+
+ public boolean hasNext() {
+ return 0L != i;
+ }
+
+ public Writable next() {
+ last = Long.lowestOneBit(i);
+ if (0 == last)
+ throw new NoSuchElementException();
+ i ^= last;
+ // numberOfTrailingZeros rtn 64 if lsb set
+ return t.get(Long.numberOfTrailingZeros(last) % 64);
+ }
+
+ public void remove() {
+ t.written ^= last;
+ if (t.has(Long.numberOfTrailingZeros(last))) {
+ throw new IllegalStateException("Attempt to remove non-existent val");
+ }
+ }
+ };
+ }
+
+ /**
+ * Convert Tuple to String as in the following.
+ * <tt>[<child1>,<child2>,...,<childn>]</tt>
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer("[");
+ for (int i = 0; i < values.length; ++i) {
+ buf.append(has(i) ? values[i].toString() : "");
+ buf.append(",");
+ }
+ if (values.length != 0)
+ buf.setCharAt(buf.length() - 1, ']');
+ else
+ buf.append(']');
+ return buf.toString();
+ }
+
+ // Writable
+
+ /**
+ * Writes each Writable to <code>out</code>. TupleWritable format:
+ * {@code
+ * <count><type1><type2>...<typen><obj1><obj2>...<objn>
+ * }
+ */
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, values.length);
+ WritableUtils.writeVLong(out, written);
+ for (int i = 0; i < values.length; ++i) {
+ Text.writeString(out, values[i].getClass().getName());
+ }
+ for (int i = 0; i < values.length; ++i) {
+ if (has(i)) {
+ values[i].write(out);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked")
+ // No static typeinfo on Tuples
+ public void readFields(DataInput in) throws IOException {
+ int card = WritableUtils.readVInt(in);
+ values = new Writable[card];
+ written = WritableUtils.readVLong(in);
+ Class<? extends Writable>[] cls = new Class[card];
+ try {
+ for (int i = 0; i < card; ++i) {
+ cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
+ }
+ for (int i = 0; i < card; ++i) {
+ values[i] = cls[i].newInstance();
+ if (has(i)) {
+ values[i].readFields(in);
+ }
+ }
+ } catch (ClassNotFoundException e) {
+ throw (IOException) new IOException("Failed tuple init").initCause(e);
+ } catch (IllegalAccessException e) {
+ throw (IOException) new IOException("Failed tuple init").initCause(e);
+ } catch (InstantiationException e) {
+ throw (IOException) new IOException("Failed tuple init").initCause(e);
+ }
+ }
+
+ /**
+ * Record that the tuple contains an element at the position provided.
+ */
+ void setWritten(int i) {
+ written |= 1L << i;
+ }
+
+ /**
+ * Record that the tuple does not contain an element at the position provided.
+ */
+ void clearWritten(int i) {
+ written &= -1 ^ (1L << i);
+ }
+
+ /**
+ * Clear any record of which writables have been written to, without releasing
+ * storage.
+ */
+ void clearWritten() {
+ written = 0L;
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java?rev=1546951&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/join/WrappedRecordReader.java Mon Dec 2 10:41:32 2013
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.RecordReader;
+
+/**
+ * Proxy class for a RecordReader participating in the join framework. This
+ * class keeps track of the "head" key-value pair for the provided
+ * RecordReader and keeps a store of values matching a key when this source is
+ * participating in a join.
+ */
+public class WrappedRecordReader<K extends WritableComparable, U extends Writable>
+ implements ComposableRecordReader<K, U> {
+
+ private boolean empty = false;
+ private RecordReader<K, U> rr;
+ private int id; // index at which values will be inserted in collector
+
+ private K khead; // key at the top of this RR
+ private U vhead; // value assoc with khead
+ private WritableComparator cmp;
+
+ private ResetableIterator<U> vjoin;
+
+ /**
+ * For a given RecordReader rr, occupy position id in collector.
+ */
+ WrappedRecordReader(int id, RecordReader<K, U> rr,
+ Class<? extends WritableComparator> cmpcl) throws IOException {
+ this.id = id;
+ this.rr = rr;
+ khead = rr.createKey();
+ vhead = rr.createValue();
+ try {
+ cmp = (null == cmpcl) ? WritableComparator.get(khead.getClass()) : cmpcl
+ .newInstance();
+ } catch (InstantiationException e) {
+ throw (IOException) new IOException().initCause(e);
+ } catch (IllegalAccessException e) {
+ throw (IOException) new IOException().initCause(e);
+ }
+ vjoin = new StreamBackedIterator<U>();
+ next();
+ }
+
+ /** {@inheritDoc} */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * Return the key at the head of this RR.
+ */
+ public K key() {
+ return khead;
+ }
+
+ /**
+ * Clone the key at the head of this RR into the object supplied.
+ */
+ public void key(K qkey) throws IOException {
+ WritableUtils.cloneInto(qkey, khead);
+ }
+
+ /**
+ * Return true if the RR- including the k,v pair stored in this object- is
+ * exhausted.
+ */
+ public boolean hasNext() {
+ return !empty;
+ }
+
+ /**
+ * Skip key-value pairs with keys less than or equal to the key provided.
+ */
+ public void skip(K key) throws IOException {
+ if (hasNext()) {
+ while (cmp.compare(khead, key) <= 0 && next())
+ ;
+ }
+ }
+
+ /**
+ * Read the next k,v pair into the head of this object; return true iff the RR
+ * and this are exhausted.
+ */
+ protected boolean next() throws IOException {
+ empty = !rr.next(khead, vhead);
+ return hasNext();
+ }
+
+ /**
+ * Add an iterator to the collector at the position occupied by this
+ * RecordReader over the values in this stream paired with the key provided
+ * (ie register a stream of values from this source matching K with a
+ * collector).
+ */
+ // JoinCollector comes from parent, which has
+ @SuppressWarnings("unchecked")
+ // no static type for the slot this sits in
+ public void accept(CompositeRecordReader.JoinCollector i, K key)
+ throws IOException {
+ vjoin.clear();
+ if (0 == cmp.compare(key, khead)) {
+ do {
+ vjoin.add(vhead);
+ } while (next() && 0 == cmp.compare(key, khead));
+ }
+ i.add(id, vjoin);
+ }
+
+ /**
+ * Write key-value pair at the head of this stream to the objects provided;
+ * get next key-value pair from proxied RR.
+ */
+ public boolean next(K key, U value) throws IOException {
+ if (hasNext()) {
+ WritableUtils.cloneInto(key, khead);
+ WritableUtils.cloneInto(value, vhead);
+ next();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Request new key from proxied RR.
+ */
+ public K createKey() {
+ return rr.createKey();
+ }
+
+ /**
+ * Request new value from proxied RR.
+ */
+ public U createValue() {
+ return rr.createValue();
+ }
+
+ /**
+ * Request progress from proxied RR.
+ */
+ public float getProgress() throws IOException {
+ return rr.getProgress();
+ }
+
+ /**
+ * Request position from proxied RR.
+ */
+ public long getPos() throws IOException {
+ return rr.getPos();
+ }
+
+ /**
+ * Forward close request to proxied RR.
+ */
+ public void close() throws IOException {
+ rr.close();
+ }
+
+ /**
+ * Implement Comparable contract (compare key at head of proxied RR with that
+ * of another).
+ */
+ public int compareTo(ComposableRecordReader<K, ?> other) {
+ return cmp.compare(key(), other.key());
+ }
+
+ /**
+ * Return true iff compareTo(other) retn true.
+ */
+ @SuppressWarnings("unchecked")
+ // Explicit type check prior to cast
+ public boolean equals(Object other) {
+ return other instanceof ComposableRecordReader
+ && 0 == compareTo((ComposableRecordReader) other);
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42;
+ }
+
+}