You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:30 UTC
[28/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
new file mode 100644
index 0000000..a3d30d2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an inner join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+ private transient K lastKey;
+ private transient List<U> leftValues;
+
+ public InnerJoinFn(PType<K> keyType, PType<U> leftValueType) {
+ super(keyType, leftValueType);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initialize() {
+ super.initialize();
+ lastKey = null;
+ this.leftValues = Lists.newArrayList();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
+ if (!key.equals(lastKey)) {
+ lastKey = keyType.getDetachedValue(key);
+ leftValues.clear();
+ }
+ if (id == 0) { // from left
+ for (Pair<U, V> pair : pairs) {
+ if (pair.first() != null)
+ leftValues.add(leftValueType.getDetachedValue(pair.first()));
+ }
+ } else { // from right
+ for (Pair<U, V> pair : pairs) {
+ for (U u : leftValues) {
+ emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getJoinType() {
+ return "innerJoin";
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java
new file mode 100644
index 0000000..99aea5a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+/**
+ * Represents a {@link org.apache.crunch.DoFn} for performing joins.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public abstract class JoinFn<K, U, V> extends
+ DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+
+ protected PType<K> keyType;
+ protected PType<U> leftValueType;
+
+ /**
+ * Instantiate with the PType of the value of the left side of the join (used for creating deep
+ * copies of values).
+ *
+ * @param keyType The PType of the value used as the key of the join
+ * @param leftValueType The PType of the value type of the left side of the join
+ */
+ public JoinFn(PType<K> keyType, PType<U> leftValueType) {
+ this.keyType = keyType;
+ this.leftValueType = leftValueType;
+ }
+
+ @Override
+ public void initialize() {
+ this.keyType.initialize(getConfiguration());
+ this.leftValueType.initialize(getConfiguration());
+ }
+
+ /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
+ public abstract String getJoinType();
+
+ /**
+ * Performs the actual joining.
+ *
+ * @param key The key for this grouping of values.
+ * @param id The side that this group of values is from (0 -> left, 1 -> right).
+ * @param pairs The group of values associated with this key and id pair.
+ * @param emitter The emitter to send the output to.
+ */
+ public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
+ Emitter<Pair<K, Pair<U, V>>> emitter);
+
+ /**
+ * Split up the input record to make coding a bit more manageable.
+ *
+ * @param input The input record.
+ * @param emitter The emitter to send the output to.
+ */
+ @Override
+ public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
+ Emitter<Pair<K, Pair<U, V>>> emitter) {
+ join(input.first().first(), input.first().second(), input.second(), emitter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
new file mode 100644
index 0000000..6efeccb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * Utilities that are useful in joining multiple data sets via a MapReduce.
+ *
+ */
+public class JoinUtils {
+
+ public static Class<? extends Partitioner> getPartitionerClass(PTypeFamily typeFamily) {
+ if (typeFamily == WritableTypeFamily.getInstance()) {
+ return TupleWritablePartitioner.class;
+ } else {
+ return AvroIndexedRecordPartitioner.class;
+ }
+ }
+
+ public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
+ if (typeFamily == WritableTypeFamily.getInstance()) {
+ return TupleWritableComparator.class;
+ } else {
+ return AvroPairGroupingComparator.class;
+ }
+ }
+
+ public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
+ @Override
+ public int getPartition(TupleWritable key, Writable value, int numPartitions) {
+ return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+ }
+ }
+
+ public static class TupleWritableComparator implements RawComparator<TupleWritable> {
+
+ private DataInputBuffer buffer = new DataInputBuffer();
+ private TupleWritable key1 = new TupleWritable();
+ private TupleWritable key2 = new TupleWritable();
+
+ @Override
+ public int compare(TupleWritable o1, TupleWritable o2) {
+ return ((WritableComparable) o1.get(0)).compareTo((WritableComparable) o2.get(0));
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try {
+ buffer.reset(b1, s1, l1);
+ key1.readFields(buffer);
+
+ buffer.reset(b2, s2, l2);
+ key2.readFields(buffer);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return compare(key1, key2);
+ }
+ }
+
+ public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
+ @Override
+ public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
+ IndexedRecord record = (IndexedRecord) key.datum();
+ return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+ }
+ }
+
+ public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
+ private Schema schema;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null) {
+ Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
+ Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
+ schema = keySchema.getFields().get(0).schema();
+ }
+ }
+
+ @Override
+ public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
+ return ReflectData.get().compare(x.datum(), y.datum(), schema);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
new file mode 100644
index 0000000..731c496
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an left outer join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+ private transient int lastId;
+ private transient K lastKey;
+ private transient List<U> leftValues;
+
+ public LeftOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+ super(keyType, leftValueType);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initialize() {
+ super.initialize();
+ lastId = 1;
+ lastKey = null;
+ this.leftValues = Lists.newArrayList();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
+ if (!key.equals(lastKey)) {
+ // Make sure that left side always gets emitted.
+ if (0 == lastId) {
+ for (U u : leftValues) {
+ emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+ }
+ }
+ lastKey = keyType.getDetachedValue(key);
+ leftValues.clear();
+ }
+ if (id == 0) {
+ for (Pair<U, V> pair : pairs) {
+ if (pair.first() != null)
+ leftValues.add(leftValueType.getDetachedValue(pair.first()));
+ }
+ } else {
+ for (Pair<U, V> pair : pairs) {
+ for (U u : leftValues) {
+ emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+ }
+ }
+ }
+
+ lastId = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
+ if (0 == lastId) {
+ for (U u : leftValues) {
+ emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getJoinType() {
+ return "leftOuterJoin";
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
new file mode 100644
index 0000000..56476c1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.IOException;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Utility for doing map side joins on a common key between two {@link PTable}s.
+ * <p>
+ * A map side join is an optimized join which doesn't use a reducer; instead,
+ * the right side of the join is loaded into memory and the join is performed in
+ * a mapper. This style of join has the important implication that the output of
+ * the join is not sorted, which is the case with a conventional (reducer-based)
+ * join.
+ * <p>
+ * <b>Note:</b>This utility is only supported when running with a
+ * {@link MRPipeline} as the pipeline.
+ */
+public class MapsideJoin {
+
+ /**
+ * Join two tables using a map side join. The right-side table will be loaded
+ * fully in memory, so this method should only be used if the right side
+ * table's contents can fit in the memory allocated to mappers. The join
+ * performed by this method is an inner join.
+ *
+ * @param left
+ * The left-side table of the join
+ * @param right
+ * The right-side table of the join, whose contents will be fully
+ * read into memory
+ * @return A table keyed on the join key, containing pairs of joined values
+ */
+ public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
+ PTypeFamily tf = left.getTypeFamily();
+ Iterable<Pair<K, V>> iterable = right.materialize();
+
+ if (iterable instanceof MaterializableIterable) {
+ MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
+ MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
+ right.getPType());
+ ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
+ if (mi.isSourceTarget()) {
+ optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
+ }
+ return left.parallelDo("mapjoin", mapJoinDoFn,
+ tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
+ optionsBuilder.build());
+ } else { // in-memory pipeline
+ return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
+ tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
+ }
+ }
+
+ static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
+
+ private Multimap<K, V> joinMap;
+
+ public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
+ joinMap = HashMultimap.create();
+ for (Pair<K, V> joinPair : iterable) {
+ joinMap.put(joinPair.first(), joinPair.second());
+ }
+ }
+
+ @Override
+ public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+ K key = input.first();
+ U value = input.second();
+ for (V joinValue : joinMap.get(key)) {
+ Pair<U, V> valuePair = Pair.of(value, joinValue);
+ emitter.emit(Pair.of(key, valuePair));
+ }
+ }
+ }
+
+ static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
+
+ private String inputPath;
+ private PType<Pair<K, V>> ptype;
+ private Multimap<K, V> joinMap;
+
+ public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
+ this.inputPath = inputPath;
+ this.ptype = ptype;
+ }
+
+ private Path getCacheFilePath() {
+ Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
+ if (local == null) {
+ throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
+ }
+ return local;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ DistCache.addCacheFile(new Path(inputPath), conf);
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+
+ ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
+ getCacheFilePath());
+ Iterable<Pair<K, V>> iterable = null;
+ try {
+ iterable = sourceTarget.read(getConfiguration());
+ } catch (IOException e) {
+ throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
+ }
+
+ joinMap = ArrayListMultimap.create();
+ for (Pair<K, V> joinPair : iterable) {
+ joinMap.put(joinPair.first(), joinPair.second());
+ }
+ }
+
+ @Override
+ public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+ K key = input.first();
+ U value = input.second();
+ for (V joinValue : joinMap.get(key)) {
+ Pair<U, V> valuePair = Pair.of(value, joinValue);
+ emitter.emit(Pair.of(key, valuePair));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
new file mode 100644
index 0000000..2789d40
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an right outer join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+ private transient K lastKey;
+ private transient List<U> leftValues;
+
+ public RightOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+ super(keyType, leftValueType);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initialize() {
+ super.initialize();
+ lastKey = null;
+ this.leftValues = Lists.newArrayList();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
+ if (!key.equals(lastKey)) {
+ lastKey = keyType.getDetachedValue(key);
+ leftValues.clear();
+ }
+ if (id == 0) {
+ for (Pair<U, V> pair : pairs) {
+ if (pair.first() != null)
+ leftValues.add(leftValueType.getDetachedValue(pair.first()));
+ }
+ } else {
+ for (Pair<U, V> pair : pairs) {
+ // Make sure that right side gets emitted.
+ if (leftValues.isEmpty()) {
+ leftValues.add(null);
+ }
+
+ for (U u : leftValues) {
+ emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getJoinType() {
+ return "rightOuterJoin";
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java
new file mode 100644
index 0000000..f1ad9f1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Inner and outer joins on collections.
+ */
+package org.apache.crunch.lib.join;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java b/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java
new file mode 100644
index 0000000..2695787
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Joining, sorting, aggregating, and other commonly used functionality.
+ */
+package org.apache.crunch.lib;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java
new file mode 100644
index 0000000..ae7f49a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.Arrays;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+/**
+ * A collection of {@code RawComparator<T>} implementations that are used by Crunch's {@code Sort} library.
+ */
+public class Comparators {
+
+ public static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
+
+ private RawComparator<T> comparator;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null) {
+ JobConf jobConf = new JobConf(conf);
+ comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+ }
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+ return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
+ }
+
+ @Override
+ public int compare(T o1, T o2) {
+ return -comparator.compare(o1, o2);
+ }
+ }
+
+ public static class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
+
+ private Schema schema;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null) {
+ schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+ }
+ }
+
+ @Override
+ public int compare(AvroKey<T> o1, AvroKey<T> o2) {
+ return -ReflectData.get().compare(o1.datum(), o2.datum(), schema);
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+ return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+ }
+ }
+
+ public static class TupleWritableComparator extends WritableComparator implements Configurable {
+
+ private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
+
+ private Configuration conf;
+ private ColumnOrder[] columnOrders;
+
+ public TupleWritableComparator() {
+ super(TupleWritable.class, true);
+ }
+
+ public static void configureOrdering(Configuration conf, Order... orders) {
+ conf.set(CRUNCH_ORDERING_PROPERTY,
+ Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
+ @Override
+ public String apply(Order o) {
+ return o.name();
+ }
+ })));
+ }
+
+ public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
+ conf.set(CRUNCH_ORDERING_PROPERTY,
+ Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
+ @Override
+ public String apply(ColumnOrder o) {
+ return o.column() + ";" + o.order().name();
+ }
+ })));
+ }
+
+ @Override
+ public int compare(WritableComparable a, WritableComparable b) {
+ TupleWritable ta = (TupleWritable) a;
+ TupleWritable tb = (TupleWritable) b;
+ for (int index = 0; index < columnOrders.length; index++) {
+ int order = 1;
+ if (columnOrders[index].order() == Order.ASCENDING) {
+ order = 1;
+ } else if (columnOrders[index].order() == Order.DESCENDING) {
+ order = -1;
+ } else { // ignore
+ continue;
+ }
+ if (!ta.has(index) && !tb.has(index)) {
+ continue;
+ } else if (ta.has(index) && !tb.has(index)) {
+ return order;
+ } else if (!ta.has(index) && tb.has(index)) {
+ return -order;
+ } else {
+ Writable v1 = ta.get(index);
+ Writable v2 = tb.get(index);
+ if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+ if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+ int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
+ if (cmp != 0) {
+ return order * cmp;
+ }
+ } else {
+ int cmp = v1.hashCode() - v2.hashCode();
+ if (cmp != 0) {
+ return order * cmp;
+ }
+ }
+ }
+ }
+ }
+ return 0; // ordering using specified cols found no differences
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (conf != null) {
+ String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
+ String[] columnOrderNames = ordering.split(",");
+ columnOrders = new ColumnOrder[columnOrderNames.length];
+ for (int i = 0; i < columnOrders.length; i++) {
+ String[] split = columnOrderNames[i].split(";");
+ int column = Integer.parseInt(split[0]);
+ Order order = Order.valueOf(split[1]);
+ columnOrders[i] = ColumnOrder.by(column, order);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java
new file mode 100644
index 0000000..be218f6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A set of {@code DoFn}s that are used by Crunch's {@code Sort} library.
+ */
+public class SortFns {
+
+ /**
+ * Extracts a single indexed key from a {@code Tuple} instance.
+ */
+ public static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
+ private final int index;
+
+ public SingleKeyFn(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public K map(V input) {
+ return (K) input.get(index);
+ }
+ }
+
+ /**
+ * Extracts a composite key from a {@code Tuple} instance.
+ */
+ public static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
+ private final int[] indices;
+ private final TupleFactory tupleFactory;
+
+ public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
+ this.indices = indices;
+ this.tupleFactory = tupleFactory;
+ }
+
+ @Override
+ public K map(V input) {
+ Object[] values = new Object[indices.length];
+ for (int i = 0; i < indices.length; i++) {
+ values[i] = input.get(indices[i]);
+ }
+ return (K) tupleFactory.makeTuple(values);
+ }
+ }
+
+ /**
+ * Pulls a composite set of keys from an Avro {@code GenericRecord} instance.
+ */
+ public static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
+
+ private final int[] indices;
+ private final String schemaJson;
+ private transient Schema schema;
+
+ public AvroGenericFn(int[] indices, Schema schema) {
+ this.indices = indices;
+ this.schemaJson = schema.toString();
+ }
+
+ @Override
+ public void initialize() {
+ this.schema = (new Schema.Parser()).parse(schemaJson);
+ }
+
+ @Override
+ public GenericRecord map(V input) {
+ GenericRecord rec = new GenericData.Record(schema);
+ for (int i = 0; i < indices.length; i++) {
+ rec.put(i, input.get(indices[i]));
+ }
+ return rec;
+ }
+ }
+
+ /**
+ * Constructs an Avro schema for the given {@code PType<S>} that respects the given column
+ * orderings.
+ */
+ public static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+ // Guarantee each tuple schema has a globally unique name
+ String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+ Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+ List<Schema.Field> fields = Lists.newArrayList();
+ AvroType<S> parentAvroType = (AvroType<S>) ptype;
+ Schema parentAvroSchema = parentAvroType.getSchema();
+
+ for (int index = 0; index < orders.length; index++) {
+ ColumnOrder columnOrder = orders[index];
+ AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+ Schema fieldSchema = atype.getSchema();
+ String fieldName = parentAvroSchema.getFields().get(index).name();
+ // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
+ // Text instances: making this consistent
+ Schema.Field.Order order = columnOrder.order() == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
+ Schema.Field.Order.ASCENDING;
+ fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
+ }
+ schema.setFields(fields);
+ return schema;
+ }
+
+ /**
+ * Utility class for encapsulating key extraction logic and serialization information about
+ * key extraction.
+ */
+ public static class KeyExtraction<V extends Tuple> {
+
+ private PType<V> ptype;
+ private final ColumnOrder[] columnOrder;
+ private final int[] cols;
+
+ private MapFn<V, Object> byFn;
+ private PType<Object> keyPType;
+
+ public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
+ this.ptype = ptype;
+ this.columnOrder = columnOrder;
+ this.cols = new int[columnOrder.length];
+ for (int i = 0; i < columnOrder.length; i++) {
+ cols[i] = columnOrder[i].column() - 1;
+ }
+ init();
+ }
+
+ private void init() {
+ List<PType> pt = ptype.getSubTypes();
+ PTypeFamily ptf = ptype.getFamily();
+ if (cols.length == 1) {
+ byFn = new SingleKeyFn(cols[0]);
+ keyPType = pt.get(cols[0]);
+ } else {
+ TupleFactory tf = null;
+ switch (cols.length) {
+ case 2:
+ tf = TupleFactory.PAIR;
+ keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
+ break;
+ case 3:
+ tf = TupleFactory.TUPLE3;
+ keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
+ break;
+ case 4:
+ tf = TupleFactory.TUPLE4;
+ keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
+ break;
+ default:
+ PType[] pts = new PType[cols.length];
+ for (int i = 0; i < pts.length; i++) {
+ pts[i] = pt.get(cols[i]);
+ }
+ tf = TupleFactory.TUPLEN;
+ keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
+ }
+
+ if (ptf == AvroTypeFamily.getInstance()) {
+ Schema s = createOrderedTupleSchema(keyPType, columnOrder);
+ keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
+ byFn = new AvroGenericFn(cols, s);
+ } else {
+ byFn = new TupleKeyFn(cols, tf);
+ }
+ }
+
+ }
+
+ public MapFn<V, Object> getByFn() {
+ return byFn;
+ }
+
+ public PType<Object> getKeyType() {
+ return keyPType;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
new file mode 100644
index 0000000..94fbdbe
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.avro.AvroFileReaderFactory;
+import org.apache.crunch.io.seq.SeqFileReaderFactory;
+import org.apache.crunch.types.writable.WritableDeepCopier;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * A partition-aware {@code Partitioner} instance that can work with either Avro or Writable-formatted
+ * keys.
+ */
+public class TotalOrderPartitioner<K, V> extends Partitioner<K, V> implements Configurable {
+
+ public static final String DEFAULT_PATH = "_partition.lst";
+ public static final String PARTITIONER_PATH =
+ "crunch.totalorderpartitioner.path";
+
+ private Configuration conf;
+ private Node<K> partitions;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ try {
+ this.conf = conf;
+ String parts = getPartitionFile(conf);
+ final Path partFile = new Path(parts);
+ final FileSystem fs = (DEFAULT_PATH.equals(parts))
+ ? FileSystem.getLocal(conf) // assume in DistributedCache
+ : partFile.getFileSystem(conf);
+
+ Job job = new Job(conf);
+ Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+ RawComparator<K> comparator =
+ (RawComparator<K>) job.getSortComparator();
+ K[] splitPoints = readPartitions(fs, partFile, keyClass, conf, comparator);
+ int numReduceTasks = job.getNumReduceTasks();
+ if (splitPoints.length != numReduceTasks - 1) {
+ throw new IOException("Wrong number of partitions in keyset");
+ }
+ partitions = new BinarySearchNode(splitPoints, comparator);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Can't read partitions file", e);
+ }
+ }
+
+ @Override
+ public int getPartition(K key, V value, int modulo) {
+ return partitions.findPartition(key);
+ }
+
+ public static void setPartitionFile(Configuration conf, Path p) {
+ conf.set(PARTITIONER_PATH, p.toString());
+ }
+
+ public static String getPartitionFile(Configuration conf) {
+ return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+ }
+
+ @SuppressWarnings("unchecked") // map output key class
+ private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+ Configuration conf, final RawComparator<K> comparator) throws IOException {
+ ArrayList<K> parts = new ArrayList<K>();
+ String schema = conf.get("crunch.schema");
+ if (schema != null) {
+ Schema s = (new Schema.Parser()).parse(schema);
+ AvroFileReaderFactory<K> a = new AvroFileReaderFactory<K>(s);
+ Iterator<K> iter = CompositePathIterable.create(fs, p, a).iterator();
+ while (iter.hasNext()) {
+ parts.add((K) new AvroKey<K>(iter.next()));
+ }
+ } else {
+ WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
+ SeqFileReaderFactory<K> s = new SeqFileReaderFactory<K>(keyClass);
+ Iterator<K> iter = CompositePathIterable.create(fs, p, s).iterator();
+ while (iter.hasNext()) {
+ parts.add((K) wdc.deepCopy((Writable) iter.next()));
+ }
+ }
+ Collections.sort(parts, comparator);
+ return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+ }
+
+ /**
+ * Interface to the partitioner to locate a key in the partition keyset.
+ */
+ interface Node<T> {
+ /**
+ * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+ * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+ */
+ int findPartition(T key);
+ }
+
+ class BinarySearchNode implements Node<K> {
+ private final K[] splitPoints;
+ private final RawComparator<K> comparator;
+ BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+ this.splitPoints = splitPoints;
+ this.comparator = comparator;
+ }
+ public int findPartition(K key) {
+ final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+ return (pos < 0) ? -pos : pos;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
new file mode 100644
index 0000000..2dcc64f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.hadoop.fs.Path;
+
+public class MaterializableIterable<E> implements Iterable<E> {
+
+ private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
+
+ private final Pipeline pipeline;
+ private final ReadableSource<E> source;
+ private Iterable<E> materialized;
+
+ public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) {
+ this.pipeline = pipeline;
+ this.source = source;
+ this.materialized = null;
+ }
+
+ public ReadableSource<E> getSource() {
+ return source;
+ }
+
+ public boolean isSourceTarget() {
+ return (source instanceof SourceTarget);
+ }
+
+ public Path getPath() {
+ if (source instanceof FileSourceImpl) {
+ return ((FileSourceImpl) source).getPath();
+ } else if (source instanceof PathTarget) {
+ return ((PathTarget) source).getPath();
+ }
+ return null;
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ if (materialized == null) {
+ pipeline.run();
+ materialize();
+ }
+ return materialized.iterator();
+ }
+
+ public void materialize() {
+ try {
+ materialized = source.read(pipeline.getConfiguration());
+ } catch (IOException e) {
+ LOG.error("Could not materialize: " + source, e);
+ throw new CrunchRuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
new file mode 100644
index 0000000..69082e2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+
+public class MaterializableMap<K, V> extends AbstractMap<K, V> {
+
+ private Iterable<Pair<K, V>> iterable;
+ private Set<Map.Entry<K, V>> entrySet;
+
+ public MaterializableMap(Iterable<Pair<K, V>> iterable) {
+ this.iterable = iterable;
+ }
+
+ private Set<Map.Entry<K, V>> toMapEntries(Iterable<Pair<K, V>> xs) {
+ HashMap<K, V> m = new HashMap<K, V>();
+ for (Pair<K, V> x : xs)
+ m.put(x.first(), x.second());
+ return m.entrySet();
+ }
+
+ @Override
+ public Set<Map.Entry<K, V>> entrySet() {
+ if (entrySet == null)
+ entrySet = toMapEntries(iterable);
+ return entrySet;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
new file mode 100644
index 0000000..60e64b1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.crunch.PCollection;
+
+/**
+ * A concrete implementation of {@link org.apache.crunch.materialize.pobject.PObjectImpl} whose
+ * value is a Java {@link java.util.Collection} containing the elements of the underlying {@link
+ * PCollection} for this {@link org.apache.crunch.PObject}.
+ *
+ * @param <S> The value type for elements contained in the {@code Collection} value encapsulated
+ * by this {@code PObject}.
+ */
+public class CollectionPObject<S> extends PObjectImpl<S, Collection<S>> {
+
+ /**
+ * Constructs a new instance of this {@code PObject} implementation.
+ *
+ * @param collect The backing {@code PCollection} for this {@code PObject}.
+ */
+ public CollectionPObject(PCollection<S> collect) {
+ super(collect);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<S> process(Iterable<S> input) {
+ Collection<S> target = new ArrayList<S>();
+ Iterator<S> itr = input.iterator();
+ while (itr.hasNext()) {
+ target.add(itr.next());
+ }
+ return target;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
new file mode 100644
index 0000000..aa5fd9e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import java.util.Iterator;
+
+import org.apache.crunch.PCollection;
+
+/**
+ * A concrete implementation of {@link PObjectImpl} that uses the first element in the backing
+ * {@link PCollection} as the {@link org.apache.crunch.PObject} value.
+ *
+ * @param <T> The value type of this {@code PObject}.
+ */
+public class FirstElementPObject<T> extends PObjectImpl<T, T> {
+
+ /**
+ * Constructs a new instance of this {@code PObject} implementation.
+ *
+ * @param collect The backing {@code PCollection} for this {@code PObject}.
+ */
+ public FirstElementPObject(PCollection<T> collect) {
+ super(collect);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T process(Iterable<T> input) {
+ Iterator<T> itr = input.iterator();
+ if (itr.hasNext()) {
+ return itr.next();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
new file mode 100644
index 0000000..243997f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+
+/**
+ * A concrete implementation of {@link PObjectImpl} whose
+ * value is a Java {@link Map}. The underlying {@link PCollection} for this
+ * {@link org.apache.crunch.PObject} must contain {@link Pair}s of values. The
+ * first element of the pair will be used as the map key, while the second element will be used
+ * as the map value. Note that the contents of the underlying {@code PCollection} may not be
+ * reflected in the returned {@code Map}, since a single key may be mapped to several values in
+ * the underlying {@code PCollection}, and only one of those values will appear in the {@code
+ * Map} encapsulated by this {@code PObject}.
+ *
+ * @param <K> The type of keys for the Map.
+ * @param <V> The type of values for the Map.
+ */
+public class MapPObject<K, V> extends PObjectImpl<Pair<K, V>, Map<K, V>> {
+
+ /**
+ * Constructs a new instance of this {@code PObject} implementation.
+ *
+ * @param collect The backing {@code PCollection} for this {@code PObject}.
+ */
+ public MapPObject(PCollection<Pair<K, V>> collect) {
+ super(collect);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Map<K, V> process(Iterable<Pair<K, V>> input) {
+ Map<K, V> target = new HashMap<K, V>();
+ Iterator<Pair<K, V>> itr = input.iterator();
+ while (itr.hasNext()) {
+ Pair<K, V> pair = itr.next();
+ target.put(pair.first(), pair.second());
+ }
+ return target;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
new file mode 100644
index 0000000..59c2ba2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+
+/**
+ * An abstract implementation of {@link PObject} that is backed by a {@link PCollection}.
+ * Clients creating a concrete implementation should override the method
+ * {@link PObjectImpl#process(Iterable)}, which transforms the backing PCollection into the
+ * singleton value encapsulated by the PObject. Once this {code PObject}'s value has been
+ * calculated, the value is cached to prevent subsequent materializations of the backing
+ * {@code PCollection}.
+ *
+ * @param <S> The type contained in the underlying PCollection.
+ * @param <T> The type encapsulated by this PObject.
+ */
+public abstract class PObjectImpl<S, T> implements PObject<T> {
+
+ // The underlying PCollection whose contents will be used to generate the value for this
+ // PObject.
+ private PCollection<S> collection;
+
+ // A variable to hold a cached copy of the value of this {@code PObject},
+ // to prevent unnecessary materializations of the backing {@code PCollection}.
+ private T cachedValue;
+
+ // A flag indicating if a value for this {@code PObject} has been cached.
+ private boolean isCached;
+
+ /**
+ * Constructs a new instance of this {@code PObject} implementation.
+ *
+ * @param collect The backing {@code PCollection} for this {@code PObject}.
+ */
+ public PObjectImpl(PCollection<S> collect) {
+ this.collection = collect;
+ this.cachedValue = null;
+ this.isCached = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return collection.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public final T getValue() {
+ if (!isCached) {
+ cachedValue = process(collection.materialize());
+ isCached = true;
+ }
+ return cachedValue;
+ }
+
+ /**
+ * Transforms the provided Iterable, obtained from the backing {@link PCollection},
+ * into the value encapsulated by this {@code PObject}.
+ *
+ * @param input An Iterable whose elements correspond to those of the backing {@code
+ * PCollection}.
+ * @return The value of this {@code PObject}.
+ */
+ protected abstract T process(Iterable<S> input);
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/package-info.java b/crunch-core/src/main/java/org/apache/crunch/package-info.java
new file mode 100644
index 0000000..38f11bc
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Client-facing API and core abstractions.
+ *
+ * @see <a href="http://crunch.apache.org/intro.html">Introduction to
+ * Apache Crunch</a>
+ */
+package org.apache.crunch;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
new file mode 100644
index 0000000..151ab82
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Performs deep copies (based on underlying PType deep copying) of Collections.
+ *
+ * @param <T> The type of Tuple implementation being copied
+ */
+public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> {
+
+ private PType<T> elementType;
+
+ public CollectionDeepCopier(PType<T> elementType) {
+ this.elementType = elementType;
+ }
+
+ @Override
+ public void initialize(Configuration conf) {
+ this.elementType.initialize(conf);
+ }
+
+ @Override
+ public Collection<T> deepCopy(Collection<T> source) {
+ if (source == null) {
+ return null;
+ }
+ List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size());
+ for (T value : source) {
+ copiedCollection.add(elementType.getDetachedValue(value));
+ }
+ return copiedCollection;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/Converter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/Converter.java b/crunch-core/src/main/java/org/apache/crunch/types/Converter.java
new file mode 100644
index 0000000..a0dbb16
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/Converter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+
+/**
+ * Converts the input key/value from a MapReduce task into the input to a
+ * {@link DoFn}, or takes the output of a {@code DoFn} and write it to the
+ * output key/values.
+ */
+public interface Converter<K, V, S, T> extends Serializable {
+ S convertInput(K key, V value);
+
+ T convertIterableInput(K key, Iterable<V> value);
+
+ K outputKey(S value);
+
+ V outputValue(S value);
+
+ Class<K> getKeyClass();
+
+ Class<V> getValueClass();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java
new file mode 100644
index 0000000..f146e86
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Performs deep copies of values.
+ *
+ * @param <T> The type of value that will be copied
+ */
+public interface DeepCopier<T> extends Serializable {
+
+ /**
+ * Initialize the deep copier with a job-specific configuration
+ *
+ * @param conf Job-specific configuration
+ */
+ void initialize(Configuration conf);
+
+ /**
+ * Create a deep copy of a value.
+ *
+ * @param source The value to be copied
+ * @return The deep copy of the value
+ */
+ T deepCopy(T source);
+
+ static class NoOpDeepCopier<V> implements DeepCopier<V> {
+
+ @Override
+ public V deepCopy(V source) {
+ return source;
+ }
+
+ @Override
+ public void initialize(Configuration conf) {
+ // No initialization needed
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java
new file mode 100644
index 0000000..de8903b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+public class MapDeepCopier<T> implements DeepCopier<Map<String, T>> {
+
+ private final PType<T> ptype;
+
+ public MapDeepCopier(PType<T> ptype) {
+ this.ptype = ptype;
+ }
+
+ @Override
+ public void initialize(Configuration conf) {
+ this.ptype.initialize(conf);
+ }
+
+ @Override
+ public Map<String, T> deepCopy(Map<String, T> source) {
+ if (source == null) {
+ return null;
+ }
+
+ Map<String, T> deepCopyMap = Maps.newHashMap();
+ for (Entry<String, T> entry : source.entrySet()) {
+ deepCopyMap.put(entry.getKey(), ptype.getDetachedValue(entry.getValue()));
+ }
+ return deepCopyMap;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
new file mode 100644
index 0000000..d276cd6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * The {@code PType} instance for {@link PGroupedTable} instances. Its settings
+ * are derived from the {@code PTableType} that was grouped to create the
+ * {@code PGroupedTable} instance.
+ *
+ */
+public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<V>>> {
+
+ protected static class PTypeIterable<V> implements Iterable<V> {
+ private final Iterable<Object> iterable;
+ private final MapFn<Object, V> mapFn;
+
+ public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
+ this.mapFn = mapFn;
+ this.iterable = iterable;
+ }
+
+ public Iterator<V> iterator() {
+ return new Iterator<V>() {
+ Iterator<Object> iter = iterable.iterator();
+
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ public V next() {
+ return mapFn.map(iter.next());
+ }
+
+ public void remove() {
+ iter.remove();
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return Iterables.toString(this);
+ }
+ }
+
+ public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
+ private final MapFn<Object, K> keys;
+ private final MapFn<Object, V> values;
+
+ public PairIterableMapFn(MapFn<Object, K> keys, MapFn<Object, V> values) {
+ this.keys = keys;
+ this.values = values;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ keys.configure(conf);
+ values.configure(conf);
+ }
+
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ keys.setContext(context);
+ values.setContext(context);
+ }
+
+ @Override
+ public void initialize() {
+ keys.initialize();
+ values.initialize();
+ }
+
+ @Override
+ public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
+ return Pair.<K, Iterable<V>> of(keys.map(input.first()), new PTypeIterable(values, input.second()));
+ }
+ }
+
+ protected final PTableType<K, V> tableType;
+
+ public PGroupedTableType(PTableType<K, V> tableType) {
+ this.tableType = tableType;
+ }
+
+ public PTableType<K, V> getTableType() {
+ return tableType;
+ }
+
+ @Override
+ public PTypeFamily getFamily() {
+ return tableType.getFamily();
+ }
+
+ @Override
+ public List<PType> getSubTypes() {
+ return tableType.getSubTypes();
+ }
+
+ @Override
+ public Converter getConverter() {
+ return tableType.getConverter();
+ }
+
+ public abstract Converter getGroupingConverter();
+
+ public abstract void configureShuffle(Job job, GroupingOptions options);
+
+ @Override
+ public ReadableSourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
+ throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java
new file mode 100644
index 0000000..3d06f8b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+
+/**
+ * An extension of {@code PType} specifically for {@link PTable} objects. It
+ * allows separate access to the {@code PType}s of the key and value for the
+ * {@code PTable}.
+ *
+ */
+public interface PTableType<K, V> extends PType<Pair<K, V>> {
+ /**
+ * Returns the key type for the table.
+ */
+ PType<K> getKeyType();
+
+ /**
+ * Returns the value type for the table.
+ */
+ PType<V> getValueType();
+
+ /**
+ * Returns the grouped table version of this type.
+ */
+ PGroupedTableType<K, V> getGroupedTableType();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PType.java b/crunch-core/src/main/java/org/apache/crunch/types/PType.java
new file mode 100644
index 0000000..ebddf84
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PType.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@code PType} defines a mapping between a data type that is used in a Crunch pipeline and a
+ * serialization and storage format that is used to read/write data from/to HDFS. Every
+ * {@link PCollection} has an associated {@code PType} that tells Crunch how to read/write data from
+ * that {@code PCollection}.
+ *
+ */
+public interface PType<T> extends Serializable {
+ /**
+ * Returns the Java type represented by this {@code PType}.
+ */
+ Class<T> getTypeClass();
+
+ /**
+ * Returns the {@code PTypeFamily} that this {@code PType} belongs to.
+ */
+ PTypeFamily getFamily();
+
+ MapFn<Object, T> getInputMapFn();
+
+ MapFn<T, Object> getOutputMapFn();
+
+ Converter getConverter();
+
+ /**
+ * Initialize this PType for use within a DoFn. This generally only needs to be called when using
+ * a PType for {@link #getDetachedValue(Object)}.
+ *
+ * @param conf Configuration object
+ * @see PType#getDetachedValue(Object)
+ */
+ void initialize(Configuration conf);
+
+ /**
+ * Returns a copy of a value (or the value itself) that can safely be retained.
+ * <p>
+ * This is useful when iterable values being processed in a DoFn (via a reducer) need to be held
+ * on to for more than the scope of a single iteration, as a reducer (and therefore also a DoFn
+ * that has an Iterable as input) re-use deserialized values. More information on object reuse is
+ * available in the {@link DoFn} class documentation.
+ *
+ * @param value The value to be deep-copied
+ * @return A deep copy of the input value
+ */
+ T getDetachedValue(T value);
+
+ /**
+ * Returns a {@code SourceTarget} that is able to read/write data using the serialization format
+ * specified by this {@code PType}.
+ */
+ ReadableSourceTarget<T> getDefaultFileSource(Path path);
+
+ /**
+ * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.
+ */
+ List<PType> getSubTypes();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
new file mode 100644
index 0000000..9458f14
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+
+/**
+ * An abstract factory for creating {@code PType} instances that have the same
+ * serialization/storage backing format.
+ *
+ */
+public interface PTypeFamily {
+ PType<Void> nulls();
+
+ PType<String> strings();
+
+ PType<Long> longs();
+
+ PType<Integer> ints();
+
+ PType<Float> floats();
+
+ PType<Double> doubles();
+
+ PType<Boolean> booleans();
+
+ PType<ByteBuffer> bytes();
+
+ <T> PType<T> records(Class<T> clazz);
+
+ <T> PType<Collection<T>> collections(PType<T> ptype);
+
+ <T> PType<Map<String, T>> maps(PType<T> ptype);
+
+ <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2);
+
+ <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3);
+
+ <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4);
+
+ PType<TupleN> tuples(PType<?>... ptypes);
+
+ <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes);
+
+ <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
+
+ <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
+
+ /**
+ * Returns the equivalent of the given ptype for this family, if it exists.
+ */
+ <T> PType<T> as(PType<T> ptype);
+}