You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:30 UTC

[28/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
new file mode 100644
index 0000000..a3d30d2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an inner join.
+ * 
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public InnerJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    super.initialize();
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      lastKey = keyType.getDetachedValue(key);
+      leftValues.clear();
+    }
+    if (id == 0) { // from left
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else { // from right
+      for (Pair<U, V> pair : pairs) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() {
+    return "innerJoin";
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java
new file mode 100644
index 0000000..99aea5a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+/**
+ * Represents a {@link org.apache.crunch.DoFn} for performing joins.
+ * 
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public abstract class JoinFn<K, U, V> extends
+    DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+
+  protected PType<K> keyType;
+  protected PType<U> leftValueType;
+
+  /**
+   * Instantiate with the PType of the value of the left side of the join (used for creating deep
+   * copies of values).
+   * 
+   * @param keyType The PType of the value used as the key of the join
+   * @param leftValueType The PType of the value type of the left side of the join
+   */
+  public JoinFn(PType<K> keyType, PType<U> leftValueType) {
+    this.keyType = keyType;
+    this.leftValueType = leftValueType;
+  }
+
+  @Override
+  public void initialize() {
+    this.keyType.initialize(getConfiguration());
+    this.leftValueType.initialize(getConfiguration());
+  }
+
+  /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
+  public abstract String getJoinType();
+
+  /**
+   * Performs the actual joining.
+   * 
+   * @param key The key for this grouping of values.
+   * @param id The side that this group of values is from (0 -> left, 1 -> right).
+   * @param pairs The group of values associated with this key and id pair.
+   * @param emitter The emitter to send the output to.
+   */
+  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter);
+
+  /**
+   * Split up the input record to make coding a bit more manageable.
+   * 
+   * @param input The input record.
+   * @param emitter The emitter to send the output to.
+   */
+  @Override
+  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    join(input.first().first(), input.first().second(), input.second(), emitter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
new file mode 100644
index 0000000..6efeccb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * Utilities that are useful in joining multiple data sets via a MapReduce.
+ * 
+ */
+public class JoinUtils {
+
+  public static Class<? extends Partitioner> getPartitionerClass(PTypeFamily typeFamily) {
+    if (typeFamily == WritableTypeFamily.getInstance()) {
+      return TupleWritablePartitioner.class;
+    } else {
+      return AvroIndexedRecordPartitioner.class;
+    }
+  }
+
+  public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
+    if (typeFamily == WritableTypeFamily.getInstance()) {
+      return TupleWritableComparator.class;
+    } else {
+      return AvroPairGroupingComparator.class;
+    }
+  }
+
+  public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
+    @Override
+    public int getPartition(TupleWritable key, Writable value, int numPartitions) {
+      return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+    }
+  }
+
+  public static class TupleWritableComparator implements RawComparator<TupleWritable> {
+
+    private DataInputBuffer buffer = new DataInputBuffer();
+    private TupleWritable key1 = new TupleWritable();
+    private TupleWritable key2 = new TupleWritable();
+
+    @Override
+    public int compare(TupleWritable o1, TupleWritable o2) {
+      return ((WritableComparable) o1.get(0)).compareTo((WritableComparable) o2.get(0));
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        buffer.reset(b1, s1, l1);
+        key1.readFields(buffer);
+
+        buffer.reset(b2, s2, l2);
+        key2.readFields(buffer);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      return compare(key1, key2);
+    }
+  }
+
+  public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
+    @Override
+    public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
+      IndexedRecord record = (IndexedRecord) key.datum();
+      return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+    }
+  }
+
+  public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
+    private Schema schema;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
+        Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
+        schema = keySchema.getFields().get(0).schema();
+      }
+    }
+
+    @Override
+    public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
+      return ReflectData.get().compare(x.datum(), y.datum(), schema);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
new file mode 100644
index 0000000..731c496
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an left outer join.
+ * 
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+  private transient int lastId;
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public LeftOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    super.initialize();
+    lastId = 1;
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      // Make sure that left side always gets emitted.
+      if (0 == lastId) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+        }
+      }
+      lastKey = keyType.getDetachedValue(key);
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+
+    lastId = id;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (0 == lastId) {
+      for (U u : leftValues) {
+        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() {
+    return "leftOuterJoin";
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
new file mode 100644
index 0000000..56476c1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.IOException;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Utility for doing map side joins on a common key between two {@link PTable}s.
+ * <p>
+ * A map side join is an optimized join which doesn't use a reducer; instead,
+ * the right side of the join is loaded into memory and the join is performed in
+ * a mapper. This style of join has the important implication that the output of
+ * the join is not sorted, which is the case with a conventional (reducer-based)
+ * join.
+ * <p>
+ * <b>Note:</b>This utility is only supported when running with a
+ * {@link MRPipeline} as the pipeline.
+ */
+public class MapsideJoin {
+
+  /**
+   * Join two tables using a map side join. The right-side table will be loaded
+   * fully in memory, so this method should only be used if the right side
+   * table's contents can fit in the memory allocated to mappers. The join
+   * performed by this method is an inner join.
+   * 
+   * @param left
+   *          The left-side table of the join
+   * @param right
+   *          The right-side table of the join, whose contents will be fully
+   *          read into memory
+   * @return A table keyed on the join key, containing pairs of joined values
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily tf = left.getTypeFamily();
+    Iterable<Pair<K, V>> iterable = right.materialize();
+
+    if (iterable instanceof MaterializableIterable) {
+      MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
+      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
+          right.getPType());
+      ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
+      if (mi.isSourceTarget()) {
+        optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
+      }
+      return left.parallelDo("mapjoin", mapJoinDoFn,
+          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
+          optionsBuilder.build());
+    } else { // in-memory pipeline
+      return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
+          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
+    }
+  }
+
+  static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
+
+    private Multimap<K, V> joinMap;
+    
+    public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
+      joinMap = HashMultimap.create();
+      for (Pair<K, V> joinPair : iterable) {
+        joinMap.put(joinPair.first(), joinPair.second());
+      }
+    }
+    
+    @Override
+    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+      K key = input.first();
+      U value = input.second();
+      for (V joinValue : joinMap.get(key)) {
+        Pair<U, V> valuePair = Pair.of(value, joinValue);
+        emitter.emit(Pair.of(key, valuePair));
+      }
+    }
+  }
+  
+  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
+
+    private String inputPath;
+    private PType<Pair<K, V>> ptype;
+    private Multimap<K, V> joinMap;
+
+    public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
+      this.inputPath = inputPath;
+      this.ptype = ptype;
+    }
+
+    private Path getCacheFilePath() {
+      Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
+      if (local == null) {
+        throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
+      }
+      return local;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      DistCache.addCacheFile(new Path(inputPath), conf);
+    }
+    
+    @Override
+    public void initialize() {
+      super.initialize();
+
+      ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
+          getCacheFilePath());
+      Iterable<Pair<K, V>> iterable = null;
+      try {
+        iterable = sourceTarget.read(getConfiguration());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
+      }
+
+      joinMap = ArrayListMultimap.create();
+      for (Pair<K, V> joinPair : iterable) {
+        joinMap.put(joinPair.first(), joinPair.second());
+      }
+    }
+
+    @Override
+    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+      K key = input.first();
+      U value = input.second();
+      for (V joinValue : joinMap.get(key)) {
+        Pair<U, V> valuePair = Pair.of(value, joinValue);
+        emitter.emit(Pair.of(key, valuePair));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
new file mode 100644
index 0000000..2789d40
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an right outer join.
+ * 
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public RightOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    super.initialize();
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      lastKey = keyType.getDetachedValue(key);
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        // Make sure that right side gets emitted.
+        if (leftValues.isEmpty()) {
+          leftValues.add(null);
+        }
+
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() {
+    return "rightOuterJoin";
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java
new file mode 100644
index 0000000..f1ad9f1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Inner and outer joins on collections.
+ */
+package org.apache.crunch.lib.join;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java b/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java
new file mode 100644
index 0000000..2695787
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Joining, sorting, aggregating, and other commonly used functionality.
+ */
+package org.apache.crunch.lib;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java
new file mode 100644
index 0000000..ae7f49a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/Comparators.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.Arrays;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+/**
+ * A collection of {@code RawComparator<T>} implementations that are used by Crunch's {@code Sort} library.
+ */
+public class Comparators {
+  
+  public static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
+
+    private RawComparator<T> comparator;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        JobConf jobConf = new JobConf(conf);
+        comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+      }
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+      return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return -comparator.compare(o1, o2);
+    }
+  }
+
+  public static class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
+
+    private Schema schema;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+      }
+    }
+
+    @Override
+    public int compare(AvroKey<T> o1, AvroKey<T> o2) {
+      return -ReflectData.get().compare(o1.datum(), o2.datum(), schema);
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+      return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+    }
+  }
+
+  public static class TupleWritableComparator extends WritableComparator implements Configurable {
+
+    private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
+
+    private Configuration conf;
+    private ColumnOrder[] columnOrders;
+
+    public TupleWritableComparator() {
+      super(TupleWritable.class, true);
+    }
+
+    public static void configureOrdering(Configuration conf, Order... orders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY,
+          Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
+            @Override
+            public String apply(Order o) {
+              return o.name();
+            }
+          })));
+    }
+
+    public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY,
+          Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
+            @Override
+            public String apply(ColumnOrder o) {
+              return o.column() + ";" + o.order().name();
+            }
+          })));
+    }
+
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      TupleWritable ta = (TupleWritable) a;
+      TupleWritable tb = (TupleWritable) b;
+      for (int index = 0; index < columnOrders.length; index++) {
+        int order = 1;
+        if (columnOrders[index].order() == Order.ASCENDING) {
+          order = 1;
+        } else if (columnOrders[index].order() == Order.DESCENDING) {
+          order = -1;
+        } else { // ignore
+          continue;
+        }
+        if (!ta.has(index) && !tb.has(index)) {
+          continue;
+        } else if (ta.has(index) && !tb.has(index)) {
+          return order;
+        } else if (!ta.has(index) && tb.has(index)) {
+          return -order;
+        } else {
+          Writable v1 = ta.get(index);
+          Writable v2 = tb.get(index);
+          if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+            if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+              int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            } else {
+              int cmp = v1.hashCode() - v2.hashCode();
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            }
+          }
+        }
+      }
+      return 0; // ordering using specified cols found no differences
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      if (conf != null) {
+        String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
+        String[] columnOrderNames = ordering.split(",");
+        columnOrders = new ColumnOrder[columnOrderNames.length];
+        for (int i = 0; i < columnOrders.length; i++) {
+          String[] split = columnOrderNames[i].split(";");
+          int column = Integer.parseInt(split[0]);
+          Order order = Order.valueOf(split[1]);
+          columnOrders[i] = ColumnOrder.by(column, order);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java
new file mode 100644
index 0000000..be218f6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/SortFns.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A set of {@code DoFn}s that are used by Crunch's {@code Sort} library.
+ */
+public class SortFns {
+
+  /**
+   * Extracts a single indexed key from a {@code Tuple} instance.
+   */
+  public static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
+    private final int index;
+    
+    public SingleKeyFn(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public K map(V input) {
+      return (K) input.get(index);
+    }
+  }
+
+  /**
+   * Extracts a composite key from a {@code Tuple} instance.
+   */
+  public static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
+    private final int[] indices;
+    private final TupleFactory tupleFactory;
+    
+    public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
+      this.indices = indices;
+      this.tupleFactory = tupleFactory;
+    }
+    
+    @Override
+    public K map(V input) {
+      Object[] values = new Object[indices.length];
+      for (int i = 0; i < indices.length; i++) {
+        values[i] = input.get(indices[i]);
+      }
+      return (K) tupleFactory.makeTuple(values);
+    }
+  }
+  
+  /**
+   * Pulls a composite set of keys from an Avro {@code GenericRecord} instance.
+   */
+  public static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
+
+    private final int[] indices;
+    private final String schemaJson;
+    private transient Schema schema;
+    
+    public AvroGenericFn(int[] indices, Schema schema) {
+      this.indices = indices;
+      this.schemaJson = schema.toString();
+    }
+    
+    @Override
+    public void initialize() {
+      this.schema = (new Schema.Parser()).parse(schemaJson);
+    }
+    
+    @Override
+    public GenericRecord map(V input) {
+      GenericRecord rec = new GenericData.Record(schema);
+      for (int i = 0; i < indices.length; i++) {
+        rec.put(i, input.get(indices[i]));
+      }
+      return rec;
+    }
+  }
+  
+  /**
+   * Constructs an Avro schema for the given {@code PType<S>} that respects the given column
+   * orderings.
+   */
+  public static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+    // Guarantee each tuple schema has a globally unique name
+    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    AvroType<S> parentAvroType = (AvroType<S>) ptype;
+    Schema parentAvroSchema = parentAvroType.getSchema();
+
+    for (int index = 0; index < orders.length; index++) {
+      ColumnOrder columnOrder = orders[index];
+      AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+      Schema fieldSchema = atype.getSchema();
+      String fieldName = parentAvroSchema.getFields().get(index).name();
+      // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
+      // Text instances: making this consistent
+      Schema.Field.Order order = columnOrder.order() == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
+        Schema.Field.Order.ASCENDING;
+      fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
+    }
+    schema.setFields(fields);
+    return schema;
+  }
+
+  /**
+   * Utility class for encapsulating key extraction logic and serialization information about
+   * key extraction.
+   */
+  public static class KeyExtraction<V extends Tuple> {
+
+    private PType<V> ptype;
+    private final ColumnOrder[] columnOrder;
+    private final int[] cols;
+    
+    private MapFn<V, Object> byFn;
+    private PType<Object> keyPType;
+    
+    public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
+      this.ptype = ptype;
+      this.columnOrder = columnOrder;
+      this.cols = new int[columnOrder.length];
+      for (int i = 0; i < columnOrder.length; i++) {
+        cols[i] = columnOrder[i].column() - 1;
+      }
+      init();
+    }
+    
+    private void init() {
+      List<PType> pt = ptype.getSubTypes();
+      PTypeFamily ptf = ptype.getFamily();
+      if (cols.length == 1) {
+        byFn = new SingleKeyFn(cols[0]);
+        keyPType = pt.get(cols[0]);
+      } else {
+        TupleFactory tf = null;
+        switch (cols.length) {
+        case 2:
+          tf = TupleFactory.PAIR;
+          keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
+          break;
+        case 3:
+          tf = TupleFactory.TUPLE3;
+          keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
+          break;
+        case 4:
+          tf = TupleFactory.TUPLE4;
+          keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
+          break;
+        default:
+          PType[] pts = new PType[cols.length];
+          for (int i = 0; i < pts.length; i++) {
+            pts[i] = pt.get(cols[i]);
+          }
+          tf = TupleFactory.TUPLEN;
+          keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
+        }
+        
+        if (ptf == AvroTypeFamily.getInstance()) {
+          Schema s = createOrderedTupleSchema(keyPType, columnOrder);
+          keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
+          byFn = new AvroGenericFn(cols, s);
+        } else {
+          byFn = new TupleKeyFn(cols, tf);
+        }
+      }
+      
+    }
+
+    public MapFn<V, Object> getByFn() {
+      return byFn;
+    }
+    
+    public PType<Object> getKeyType() {
+      return keyPType;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
new file mode 100644
index 0000000..94fbdbe
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.avro.AvroFileReaderFactory;
+import org.apache.crunch.io.seq.SeqFileReaderFactory;
+import org.apache.crunch.types.writable.WritableDeepCopier;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * A partition-aware {@code Partitioner} instance that can work with either Avro or Writable-formatted
+ * keys.
+ */
+public class TotalOrderPartitioner<K, V> extends Partitioner<K, V> implements Configurable {
+
+  public static final String DEFAULT_PATH = "_partition.lst";
+  public static final String PARTITIONER_PATH = 
+    "crunch.totalorderpartitioner.path";
+  
+  private Configuration conf;
+  private Node<K> partitions;
+  
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    try {
+      this.conf = conf;
+      String parts = getPartitionFile(conf);
+      final Path partFile = new Path(parts);
+      final FileSystem fs = (DEFAULT_PATH.equals(parts))
+        ? FileSystem.getLocal(conf)     // assume in DistributedCache
+        : partFile.getFileSystem(conf);
+
+      Job job = new Job(conf);
+      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+      RawComparator<K> comparator =
+          (RawComparator<K>) job.getSortComparator();
+      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf, comparator);
+      int numReduceTasks = job.getNumReduceTasks();
+      if (splitPoints.length != numReduceTasks - 1) {
+        throw new IOException("Wrong number of partitions in keyset");
+      }
+      partitions = new BinarySearchNode(splitPoints, comparator);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Can't read partitions file", e);
+    }
+  }
+
+  @Override
+  public int getPartition(K key, V value, int modulo) {
+    return partitions.findPartition(key);
+  }
+
+  public static void setPartitionFile(Configuration conf, Path p) {
+    conf.set(PARTITIONER_PATH, p.toString());
+  }
+
+  public static String getPartitionFile(Configuration conf) {
+    return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+  }
+  
+  @SuppressWarnings("unchecked") // map output key class
+  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+      Configuration conf, final RawComparator<K> comparator) throws IOException {
+    ArrayList<K> parts = new ArrayList<K>();
+    String schema = conf.get("crunch.schema");
+    if (schema != null) {
+      Schema s = (new Schema.Parser()).parse(schema);
+      AvroFileReaderFactory<K> a = new AvroFileReaderFactory<K>(s);
+      Iterator<K> iter = CompositePathIterable.create(fs, p, a).iterator();
+      while (iter.hasNext()) {
+        parts.add((K) new AvroKey<K>(iter.next()));
+      }
+    } else {
+      WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
+      SeqFileReaderFactory<K> s = new SeqFileReaderFactory<K>(keyClass);
+      Iterator<K> iter = CompositePathIterable.create(fs, p, s).iterator();
+      while (iter.hasNext()) {
+        parts.add((K) wdc.deepCopy((Writable) iter.next()));
+      }
+    }
+    Collections.sort(parts, comparator);
+    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+  }
+  
+  /**
+   * Interface to the partitioner to locate a key in the partition keyset.
+   */
+  interface Node<T> {
+    /**
+     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+     */
+    int findPartition(T key);
+  }
+  
+  class BinarySearchNode implements Node<K> {
+    private final K[] splitPoints;
+    private final RawComparator<K> comparator;
+    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+      this.splitPoints = splitPoints;
+      this.comparator = comparator;
+    }
+    public int findPartition(K key) {
+      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
new file mode 100644
index 0000000..2dcc64f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.hadoop.fs.Path;
+
+public class MaterializableIterable<E> implements Iterable<E> {
+
+  private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
+
+  private final Pipeline pipeline;
+  private final ReadableSource<E> source;
+  private Iterable<E> materialized;
+
+  public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) {
+    this.pipeline = pipeline;
+    this.source = source;
+    this.materialized = null;
+  }
+
+  public ReadableSource<E> getSource() {
+    return source;
+  }
+
+  public boolean isSourceTarget() {
+    return (source instanceof SourceTarget);
+  }
+  
+  public Path getPath() {
+    if (source instanceof FileSourceImpl) {
+      return ((FileSourceImpl) source).getPath();
+    } else if (source instanceof PathTarget) {
+      return ((PathTarget) source).getPath();
+    }
+    return null;
+  }
+  
+  @Override
+  public Iterator<E> iterator() {
+    if (materialized == null) {
+      pipeline.run();
+      materialize();
+    }
+    return materialized.iterator();
+  }
+
+  public void materialize() {
+    try {
+      materialized = source.read(pipeline.getConfiguration());
+    } catch (IOException e) {
+      LOG.error("Could not materialize: " + source, e);
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
new file mode 100644
index 0000000..69082e2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+
+public class MaterializableMap<K, V> extends AbstractMap<K, V> {
+
+  private Iterable<Pair<K, V>> iterable;
+  private Set<Map.Entry<K, V>> entrySet;
+
+  public MaterializableMap(Iterable<Pair<K, V>> iterable) {
+    this.iterable = iterable;
+  }
+
+  private Set<Map.Entry<K, V>> toMapEntries(Iterable<Pair<K, V>> xs) {
+    HashMap<K, V> m = new HashMap<K, V>();
+    for (Pair<K, V> x : xs)
+      m.put(x.first(), x.second());
+    return m.entrySet();
+  }
+
+  @Override
+  public Set<Map.Entry<K, V>> entrySet() {
+    if (entrySet == null)
+      entrySet = toMapEntries(iterable);
+    return entrySet;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
new file mode 100644
index 0000000..60e64b1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.crunch.PCollection;
+
+/**
+ * A concrete implementation of {@link org.apache.crunch.materialize.pobject.PObjectImpl} whose
+ * value is a Java {@link java.util.Collection} containing the elements of the underlying {@link
+ * PCollection} for this {@link org.apache.crunch.PObject}.
+ *
+ * @param <S> The value type for elements contained in the {@code Collection} value encapsulated
+ * by this {@code PObject}.
+ */
+public class CollectionPObject<S> extends PObjectImpl<S, Collection<S>> {
+
+  /**
+   * Constructs a new instance of this {@code PObject} implementation.
+   *
+   * @param collect The backing {@code PCollection} for this {@code PObject}.
+   */
+  public CollectionPObject(PCollection<S> collect) {
+    super(collect);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Collection<S> process(Iterable<S> input) {
+    Collection<S> target = new ArrayList<S>();
+    Iterator<S> itr = input.iterator();
+    while (itr.hasNext()) {
+      target.add(itr.next());
+    }
+    return target;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
new file mode 100644
index 0000000..aa5fd9e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import java.util.Iterator;
+
+import org.apache.crunch.PCollection;
+
+/**
+ * A concrete implementation of {@link PObjectImpl} that uses the first element in the backing
+ * {@link PCollection} as the {@link org.apache.crunch.PObject} value.
+ *
+ * @param <T> The value type of this {@code PObject}.
+ */
+public class FirstElementPObject<T> extends PObjectImpl<T, T> {
+
+  /**
+   * Constructs a new instance of this {@code PObject} implementation.
+   *
+   * @param collect The backing {@code PCollection} for this {@code PObject}.
+   */
+  public FirstElementPObject(PCollection<T> collect) {
+    super(collect);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public T process(Iterable<T> input) {
+    Iterator<T> itr = input.iterator();
+    if (itr.hasNext()) {
+      return itr.next();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
new file mode 100644
index 0000000..243997f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+
+/**
+ * A concrete implementation of {@link PObjectImpl} whose
+ * value is a Java {@link Map}. The underlying {@link PCollection} for this
+ * {@link org.apache.crunch.PObject} must contain {@link Pair}s of values. The
+ * first element of the pair will be used as the map key, while the second element will be used
+ * as the map value.  Note that the contents of the underlying {@code PCollection} may not be
+ * reflected in the returned {@code Map}, since a single key may be mapped to several values in
+ * the underlying {@code PCollection}, and only one of those values will appear in the {@code
+ * Map} encapsulated by this {@code PObject}.
+ *
+ * @param <K> The type of keys for the Map.
+ * @param <V> The type of values for the Map.
+ */
+public class MapPObject<K, V> extends PObjectImpl<Pair<K, V>, Map<K, V>> {
+
+  /**
+   * Constructs a new instance of this {@code PObject} implementation.
+   *
+   * @param collect The backing {@code PCollection} for this {@code PObject}.
+   */
+  public MapPObject(PCollection<Pair<K, V>> collect) {
+    super(collect);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Map<K, V> process(Iterable<Pair<K, V>> input) {
+    Map<K, V> target = new HashMap<K, V>();
+    Iterator<Pair<K, V>> itr = input.iterator();
+    while (itr.hasNext()) {
+      Pair<K, V> pair = itr.next();
+      target.put(pair.first(), pair.second());
+    }
+    return target;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
new file mode 100644
index 0000000..59c2ba2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize.pobject;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+
+/**
+ * An abstract implementation of {@link PObject} that is backed by a {@link PCollection}.
+ * Clients creating a concrete implementation should override the method
+ * {@link PObjectImpl#process(Iterable)}, which transforms the backing PCollection into the
+ * singleton value encapsulated by the PObject. Once this {code PObject}'s value has been
+ * calculated, the value is cached to prevent subsequent materializations of the backing
+ * {@code PCollection}.
+ *
+ * @param <S> The type contained in the underlying PCollection.
+ * @param <T> The type encapsulated by this PObject.
+ */
+public abstract class PObjectImpl<S, T> implements PObject<T> {
+
+  // The underlying PCollection whose contents will be used to generate the value for this
+  // PObject.
+  private PCollection<S> collection;
+
+  // A variable to hold a cached copy of the value of this {@code PObject},
+  // to prevent unnecessary materializations of the backing {@code PCollection}.
+  private T cachedValue;
+
+  // A flag indicating if a value for this {@code PObject} has been cached.
+  private boolean isCached;
+
+  /**
+   * Constructs a new instance of this {@code PObject} implementation.
+   *
+   * @param collect The backing {@code PCollection} for this {@code PObject}.
+   */
+  public PObjectImpl(PCollection<S> collect) {
+    this.collection = collect;
+    this.cachedValue = null;
+    this.isCached = false;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    return collection.toString();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public final T getValue() {
+    if (!isCached) {
+      cachedValue = process(collection.materialize());
+      isCached = true;
+    }
+    return cachedValue;
+  }
+
+  /**
+   * Transforms the provided Iterable, obtained from the backing {@link PCollection},
+   * into the value encapsulated by this {@code PObject}.
+   *
+   * @param input An Iterable whose elements correspond to those of the backing {@code
+   * PCollection}.
+   * @return The value of this {@code PObject}.
+   */
+  protected abstract T process(Iterable<S> input);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/package-info.java b/crunch-core/src/main/java/org/apache/crunch/package-info.java
new file mode 100644
index 0000000..38f11bc
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Client-facing API and core abstractions.
+ *
+ * @see <a href="http://crunch.apache.org/intro.html">Introduction to
+ *      Apache Crunch</a>
+ */
+package org.apache.crunch;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
new file mode 100644
index 0000000..151ab82
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Performs deep copies (based on underlying PType deep copying) of Collections.
+ * 
+ * @param <T> The type of Tuple implementation being copied
+ */
+public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> {
+
+  private PType<T> elementType;
+
+  public CollectionDeepCopier(PType<T> elementType) {
+    this.elementType = elementType;
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    this.elementType.initialize(conf);
+  }
+
+  @Override
+  public Collection<T> deepCopy(Collection<T> source) {
+    if (source == null) {
+      return null;
+    }
+    List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size());
+    for (T value : source) {
+      copiedCollection.add(elementType.getDetachedValue(value));
+    }
+    return copiedCollection;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/Converter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/Converter.java b/crunch-core/src/main/java/org/apache/crunch/types/Converter.java
new file mode 100644
index 0000000..a0dbb16
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/Converter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+
+/**
+ * Converts the input key/value from a MapReduce task into the input to a
+ * {@link DoFn}, or takes the output of a {@code DoFn} and write it to the
+ * output key/values.
+ */
+public interface Converter<K, V, S, T> extends Serializable {
+  S convertInput(K key, V value);
+
+  T convertIterableInput(K key, Iterable<V> value);
+
+  K outputKey(S value);
+
+  V outputValue(S value);
+
+  Class<K> getKeyClass();
+
+  Class<V> getValueClass();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java
new file mode 100644
index 0000000..f146e86
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/DeepCopier.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Performs deep copies of values.
+ * 
+ * @param <T> The type of value that will be copied
+ */
+public interface DeepCopier<T> extends Serializable {
+
+  /**
+   * Initialize the deep copier with a job-specific configuration
+   * 
+   * @param conf Job-specific configuration
+   */
+  void initialize(Configuration conf);
+
+  /**
+   * Create a deep copy of a value.
+   * 
+   * @param source The value to be copied
+   * @return The deep copy of the value
+   */
+  T deepCopy(T source);
+
+  static class NoOpDeepCopier<V> implements DeepCopier<V> {
+
+    @Override
+    public V deepCopy(V source) {
+      return source;
+    }
+
+    @Override
+    public void initialize(Configuration conf) {
+      // No initialization needed
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java
new file mode 100644
index 0000000..de8903b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/MapDeepCopier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+public class MapDeepCopier<T> implements DeepCopier<Map<String, T>> {
+
+  private final PType<T> ptype;
+
+  public MapDeepCopier(PType<T> ptype) {
+    this.ptype = ptype;
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    this.ptype.initialize(conf);
+  }
+
+  @Override
+  public Map<String, T> deepCopy(Map<String, T> source) {
+    if (source == null) {
+      return null;
+    }
+    
+    Map<String, T> deepCopyMap = Maps.newHashMap();
+    for (Entry<String, T> entry : source.entrySet()) {
+      deepCopyMap.put(entry.getKey(), ptype.getDetachedValue(entry.getValue()));
+    }
+    return deepCopyMap;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
new file mode 100644
index 0000000..d276cd6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * The {@code PType} instance for {@link PGroupedTable} instances. Its settings
+ * are derived from the {@code PTableType} that was grouped to create the
+ * {@code PGroupedTable} instance.
+ * 
+ */
+public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<V>>> {
+
+  protected static class PTypeIterable<V> implements Iterable<V> {
+    private final Iterable<Object> iterable;
+    private final MapFn<Object, V> mapFn;
+
+    public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
+      this.mapFn = mapFn;
+      this.iterable = iterable;
+    }
+
+    public Iterator<V> iterator() {
+      return new Iterator<V>() {
+        Iterator<Object> iter = iterable.iterator();
+
+        public boolean hasNext() {
+          return iter.hasNext();
+        }
+
+        public V next() {
+          return mapFn.map(iter.next());
+        }
+
+        public void remove() {
+          iter.remove();
+        }
+      };
+    }
+    
+    @Override
+    public String toString() {
+      return Iterables.toString(this);
+    }
+  }
+
+  public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
+    private final MapFn<Object, K> keys;
+    private final MapFn<Object, V> values;
+
+    public PairIterableMapFn(MapFn<Object, K> keys, MapFn<Object, V> values) {
+      this.keys = keys;
+      this.values = values;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      keys.configure(conf);
+      values.configure(conf);
+    }
+    
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      keys.setContext(context);
+      values.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      keys.initialize();
+      values.initialize();
+    }
+
+    @Override
+    public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
+      return Pair.<K, Iterable<V>> of(keys.map(input.first()), new PTypeIterable(values, input.second()));
+    }
+  }
+
+  protected final PTableType<K, V> tableType;
+
+  public PGroupedTableType(PTableType<K, V> tableType) {
+    this.tableType = tableType;
+  }
+
+  public PTableType<K, V> getTableType() {
+    return tableType;
+  }
+
+  @Override
+  public PTypeFamily getFamily() {
+    return tableType.getFamily();
+  }
+
+  @Override
+  public List<PType> getSubTypes() {
+    return tableType.getSubTypes();
+  }
+
+  @Override
+  public Converter getConverter() {
+    return tableType.getConverter();
+  }
+
+  public abstract Converter getGroupingConverter();
+
+  public abstract void configureShuffle(Job job, GroupingOptions options);
+
+  @Override
+  public ReadableSourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
+    throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java
new file mode 100644
index 0000000..3d06f8b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTableType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+
+/**
+ * An extension of {@code PType} specifically for {@link PTable} objects. It
+ * allows separate access to the {@code PType}s of the key and value for the
+ * {@code PTable}.
+ * 
+ */
+public interface PTableType<K, V> extends PType<Pair<K, V>> {
+  /**
+   * Returns the key type for the table.
+   */
+  PType<K> getKeyType();
+
+  /**
+   * Returns the value type for the table.
+   */
+  PType<V> getValueType();
+
+  /**
+   * Returns the grouped table version of this type.
+   */
+  PGroupedTableType<K, V> getGroupedTableType();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PType.java b/crunch-core/src/main/java/org/apache/crunch/types/PType.java
new file mode 100644
index 0000000..ebddf84
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PType.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@code PType} defines a mapping between a data type that is used in a Crunch pipeline and a
+ * serialization and storage format that is used to read/write data from/to HDFS. Every
+ * {@link PCollection} has an associated {@code PType} that tells Crunch how to read/write data from
+ * that {@code PCollection}.
+ * 
+ */
+public interface PType<T> extends Serializable {
+  /**
+   * Returns the Java type represented by this {@code PType}.
+   */
+  Class<T> getTypeClass();
+
+  /**
+   * Returns the {@code PTypeFamily} that this {@code PType} belongs to.
+   */
+  PTypeFamily getFamily();
+
+  MapFn<Object, T> getInputMapFn();
+
+  MapFn<T, Object> getOutputMapFn();
+
+  Converter getConverter();
+
+  /**
+   * Initialize this PType for use within a DoFn. This generally only needs to be called when using
+   * a PType for {@link #getDetachedValue(Object)}.
+   * 
+   * @param conf Configuration object
+   * @see PType#getDetachedValue(Object)
+   */
+  void initialize(Configuration conf);
+
+  /**
+   * Returns a copy of a value (or the value itself) that can safely be retained.
+   * <p>
+   * This is useful when iterable values being processed in a DoFn (via a reducer) need to be held
+   * on to for more than the scope of a single iteration, as a reducer (and therefore also a DoFn
+   * that has an Iterable as input) re-use deserialized values. More information on object reuse is
+   * available in the {@link DoFn} class documentation.
+   * 
+   * @param value The value to be deep-copied
+   * @return A deep copy of the input value
+   */
+  T getDetachedValue(T value);
+
+  /**
+   * Returns a {@code SourceTarget} that is able to read/write data using the serialization format
+   * specified by this {@code PType}.
+   */
+  ReadableSourceTarget<T> getDefaultFileSource(Path path);
+
+  /**
+   * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.
+   */
+  List<PType> getSubTypes();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
new file mode 100644
index 0000000..9458f14
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+
+/**
+ * An abstract factory for creating {@code PType} instances that have the same
+ * serialization/storage backing format.
+ * 
+ */
+public interface PTypeFamily {
+  PType<Void> nulls();
+
+  PType<String> strings();
+
+  PType<Long> longs();
+
+  PType<Integer> ints();
+
+  PType<Float> floats();
+
+  PType<Double> doubles();
+
+  PType<Boolean> booleans();
+
+  PType<ByteBuffer> bytes();
+
+  <T> PType<T> records(Class<T> clazz);
+
+  <T> PType<Collection<T>> collections(PType<T> ptype);
+
+  <T> PType<Map<String, T>> maps(PType<T> ptype);
+
+  <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2);
+
+  <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3);
+
+  <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4);
+
+  PType<TupleN> tuples(PType<?>... ptypes);
+
+  <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes);
+
+  <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
+
+  <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
+
+  /**
+   * Returns the equivalent of the given ptype for this family, if it exists.
+   */
+  <T> PType<T> as(PType<T> ptype);
+}