You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:31 UTC

[29/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
new file mode 100644
index 0000000..0c3e6a4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.avro.Schema;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroTextOutputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableType;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class TextFileTarget extends FileTargetImpl {
+  private static Class<? extends FileOutputFormat> getOutputFormat(PType<?> ptype) {
+    if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
+      return AvroTextOutputFormat.class;
+    } else {
+      return TextOutputFormat.class;
+    }
+  }
+
+  public <T> TextFileTarget(String path) {
+    this(new Path(path));
+  }
+
+  public <T> TextFileTarget(Path path) {
+    this(path, new SequentialFileNamingScheme());
+  }
+
+  public <T> TextFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, null, fileNamingScheme);
+  }
+
+  @Override
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public String toString() {
+    return "Text(" + path + ")";
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    Converter converter = ptype.getConverter();
+    Class keyClass = converter.getKeyClass();
+    Class valueClass = converter.getValueClass();
+    configureForMapReduce(job, keyClass, valueClass, getOutputFormat(ptype), outputPath, name);
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (!isTextCompatible(ptype)) {
+      return null;
+    }
+    if (ptype instanceof PTableType) {
+      return new TextFileTableSourceTarget(path, (PTableType) ptype);
+    }
+    return new TextFileSourceTarget<T>(path, ptype);
+  }
+  
+  private <T> boolean isTextCompatible(PType<T> ptype) {
+    if (AvroTypeFamily.getInstance().equals(ptype.getFamily())) {
+      AvroType<T> at = (AvroType<T>) ptype;
+      if (at.getSchema().equals(Schema.create(Schema.Type.STRING))) {
+        return true;
+      }
+    } else if (WritableTypeFamily.getInstance().equals(ptype.getFamily())) {
+      if (ptype instanceof PTableType) {
+        PTableType ptt = (PTableType) ptype;
+        return isText(ptt.getKeyType()) && isText(ptt.getValueType());
+      } else {
+        return isText(ptype);
+      }
+    }
+    return false;
+  }
+  
+  private <T> boolean isText(PType<T> wtype) {
+    return Text.class.equals(((WritableType) wtype).getSerializationClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
new file mode 100644
index 0000000..d4109cc
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.materialize.pobject.FirstElementPObject;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Methods for performing various types of aggregations over {@link PCollection} instances.
+ * 
+ */
+public class Aggregate {
+
+  /**
+   * Returns a {@code PTable} that contains the unique elements of this collection mapped to a count
+   * of their occurrences.
+   */
+  public static <S> PTable<S, Long> count(PCollection<S> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>() {
+      public Pair<S, Long> map(S input) {
+        return Pair.of(input, 1L);
+      }
+    }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey()
+        .combineValues(Aggregators.SUM_LONGS());
+  }
+
+  /**
+   * Returns the number of elements in the provided PCollection.
+   * 
+   * @param collect The PCollection whose elements should be counted.
+   * @param <S> The type of the PCollection.
+   * @return A {@code PObject} containing the number of elements in the {@code PCollection}.
+   */
+  public static <S> PObject<Long> length(PCollection<S> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    PTable<Integer, Long> countTable = collect
+        .parallelDo("Aggregate.count", new MapFn<S, Pair<Integer, Long>>() {
+          public Pair<Integer, Long> map(S input) {
+            return Pair.of(1, 1L);
+          }
+        }, tf.tableOf(tf.ints(), tf.longs()))
+        .groupByKey(GroupingOptions.builder().numReducers(1).build())
+        .combineValues(Aggregators.SUM_LONGS());
+    PCollection<Long> count = countTable.values();
+    return new FirstElementPObject<Long>(count);
+  }
+
+  public static class PairValueComparator<K, V> implements Comparator<Pair<K, V>> {
+    private final boolean ascending;
+
+    public PairValueComparator(boolean ascending) {
+      this.ascending = ascending;
+    }
+
+    @Override
+    public int compare(Pair<K, V> left, Pair<K, V> right) {
+      int cmp = ((Comparable<V>) left.second()).compareTo(right.second());
+      return ascending ? cmp : -cmp;
+    }
+  }
+
+  public static class TopKFn<K, V> extends DoFn<Pair<K, V>, Pair<Integer, Pair<K, V>>> {
+
+    private final int limit;
+    private final boolean maximize;
+    private transient PriorityQueue<Pair<K, V>> values;
+
+    public TopKFn(int limit, boolean ascending) {
+      this.limit = limit;
+      this.maximize = ascending;
+    }
+
+    public void initialize() {
+      this.values = new PriorityQueue<Pair<K, V>>(limit, new PairValueComparator<K, V>(maximize));
+    }
+
+    public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      values.add(input);
+      if (values.size() > limit) {
+        values.poll();
+      }
+    }
+
+    public void cleanup(Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      for (Pair<K, V> p : values) {
+        emitter.emit(Pair.of(0, p));
+      }
+    }
+  }
+
+  public static class TopKCombineFn<K, V> extends CombineFn<Integer, Pair<K, V>> {
+
+    private final int limit;
+    private final boolean maximize;
+
+    public TopKCombineFn(int limit, boolean maximize) {
+      this.limit = limit;
+      this.maximize = maximize;
+    }
+
+    @Override
+    public void process(Pair<Integer, Iterable<Pair<K, V>>> input,
+        Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize);
+      PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit, cmp);
+      for (Pair<K, V> pair : input.second()) {
+        queue.add(pair);
+        if (queue.size() > limit) {
+          queue.poll();
+        }
+      }
+
+      List<Pair<K, V>> values = Lists.newArrayList(queue);
+      Collections.sort(values, cmp);
+      for (int i = values.size() - 1; i >= 0; i--) {
+        emitter.emit(Pair.of(0, values.get(i)));
+      }
+    }
+  }
+
+  public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit, boolean maximize) {
+    PTypeFamily ptf = ptable.getTypeFamily();
+    PTableType<K, V> base = ptable.getPTableType();
+    PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
+    PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
+    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter)
+        .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize))
+        .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() {
+          public void process(Pair<Integer, Pair<K, V>> input, Emitter<Pair<K, V>> emitter) {
+            emitter.emit(input.second());
+          }
+        }, base);
+  }
+
+  /**
+   * Returns the largest numerical element from the input collection.
+   */
+  public static <S> PObject<S> max(PCollection<S> collect) {
+    Class<S> clazz = collect.getPType().getTypeClass();
+    if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
+      throw new IllegalArgumentException("Can only get max for Comparable elements, not for: "
+          + collect.getPType().getTypeClass());
+    }
+    PTypeFamily tf = collect.getTypeFamily();
+    PCollection<S> maxCollect = PTables.values(collect
+        .parallelDo("max", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S max = null;
+
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (max == null || ((Comparable<S>) max).compareTo(input) < 0) {
+              max = input;
+            }
+          }
+
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (max != null) {
+              emitter.emit(Pair.of(true, max));
+            }
+          }
+        }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1)
+        .combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
+            S max = null;
+            for (S v : input.second()) {
+              if (max == null || ((Comparable<S>) max).compareTo(v) < 0) {
+                max = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), max));
+          }
+        }));
+    return new FirstElementPObject<S>(maxCollect);
+  }
+
+  /**
+   * Returns the smallest numerical element from the input collection.
+   */
+  public static <S> PObject<S> min(PCollection<S> collect) {
+    Class<S> clazz = collect.getPType().getTypeClass();
+    if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
+      throw new IllegalArgumentException("Can only get min for Comparable elements, not for: "
+          + collect.getPType().getTypeClass());
+    }
+    PTypeFamily tf = collect.getTypeFamily();
+    PCollection<S> minCollect = PTables.values(collect
+        .parallelDo("min", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S min = null;
+
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (min == null || ((Comparable<S>) min).compareTo(input) > 0) {
+              min = input;
+            }
+          }
+
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (min != null) {
+              emitter.emit(Pair.of(false, min));
+            }
+          }
+        }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1)
+        .combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
+            S min = null;
+            for (S v : input.second()) {
+              if (min == null || ((Comparable<S>) min).compareTo(v) > 0) {
+                min = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), min));
+          }
+        }));
+    return new FirstElementPObject<S>(minCollect);
+  }
+
+  public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    final PType<V> valueType = collect.getValueType();
+    return collect.groupByKey().parallelDo("collect",
+        new MapValuesFn<K, Iterable<V>, Collection<V>>() {
+
+          @Override
+          public void initialize() {
+            valueType.initialize(getConfiguration());
+          }
+
+          public Collection<V> map(Iterable<V> values) {
+            List<V> collected = Lists.newArrayList();
+            for (V value : values) {
+              collected.add(valueType.getDetachedValue(value));
+            }
+            return collected;
+          }
+        }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java
new file mode 100644
index 0000000..08327dd
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Random;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for Cartesian products of two {@code PTable} or {@code PCollection}
+ * instances.
+ */
+@SuppressWarnings("serial")
+public class Cartesian {
+
+  /**
+   * Helper for building the artificial cross keys. This technique was taken
+   * from Pig's CROSS.
+   */
+  private static class GFCross<V> extends DoFn<V, Pair<Pair<Integer, Integer>, V>> {
+
+    private final int constantField;
+    private final int parallelism;
+    private final Random r;
+
+    public GFCross(int constantField, int parallelism) {
+      this.constantField = constantField;
+      this.parallelism = parallelism;
+      this.r = new Random();
+    }
+
+    public void process(V input, Emitter<Pair<Pair<Integer, Integer>, V>> emitter) {
+      int c = r.nextInt(parallelism);
+      if (constantField == 0) {
+        for (int i = 0; i < parallelism; i++) {
+          emitter.emit(Pair.of(Pair.of(c, i), input));
+        }
+      } else {
+        for (int i = 0; i < parallelism; i++) {
+          emitter.emit(Pair.of(Pair.of(i, c), input));
+        }
+      }
+    }
+  }
+
+  static final int DEFAULT_PARALLELISM = 6;
+
+  /**
+   * Performs a full cross join on the specified {@link PTable}s (using the same
+   * strategy as Pig's CROSS operator).
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross
+   *      Join</a>
+   * @param left
+   *          A PTable to perform a cross join on.
+   * @param right
+   *          A PTable to perform a cross join on.
+   * @param <K1>
+   *          Type of left PTable's keys.
+   * @param <K2>
+   *          Type of right PTable's keys.
+   * @param <U>
+   *          Type of the first {@link PTable}'s values
+   * @param <V>
+   *          Type of the second {@link PTable}'s values
+   * @return The joined result as tuples of ((K1,K2), (U,V)).
+   */
+  public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(PTable<K1, U> left, PTable<K2, V> right) {
+    return cross(left, right, DEFAULT_PARALLELISM);
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PTable}s (using the same
+   * strategy as Pig's CROSS operator).
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross
+   *      Join</a>
+   * @param left
+   *          A PTable to perform a cross join on.
+   * @param right
+   *          A PTable to perform a cross join on.
+   * @param parallelism
+   *          The square root of the number of reducers to use. Increasing
+   *          parallelism also increases copied data.
+   * @param <K1>
+   *          Type of left PTable's keys.
+   * @param <K2>
+   *          Type of right PTable's keys.
+   * @param <U>
+   *          Type of the first {@link PTable}'s values
+   * @param <V>
+   *          Type of the second {@link PTable}'s values
+   * @return The joined result as tuples of ((K1,K2), (U,V)).
+   */
+  public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(PTable<K1, U> left, PTable<K2, V> right,
+      int parallelism) {
+
+    /*
+     * The strategy here is to simply emulate the following PigLatin: A =
+     * foreach table1 generate flatten(GFCross(0, 2)), flatten(*); B = foreach
+     * table2 generate flatten(GFCross(1, 2)), flatten(*); C = cogroup A by ($0,
+     * $1), B by ($0, $1); result = foreach C generate flatten(A), flatten(B);
+     */
+
+    PTypeFamily ltf = left.getTypeFamily();
+    PTypeFamily rtf = right.getTypeFamily();
+
+    PTable<Pair<Integer, Integer>, Pair<K1, U>> leftCross = left.parallelDo(new GFCross<Pair<K1, U>>(0, parallelism),
+        ltf.tableOf(ltf.pairs(ltf.ints(), ltf.ints()), ltf.pairs(left.getKeyType(), left.getValueType())));
+    PTable<Pair<Integer, Integer>, Pair<K2, V>> rightCross = right.parallelDo(new GFCross<Pair<K2, V>>(1, parallelism),
+        rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), rtf.pairs(right.getKeyType(), right.getValueType())));
+
+    PTable<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> cg = leftCross.join(rightCross);
+
+    PTypeFamily ctf = cg.getTypeFamily();
+
+    return cg.parallelDo(
+        new MapFn<Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() {
+
+          @Override
+          public Pair<Pair<K1, K2>, Pair<U, V>> map(Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> input) {
+            Pair<Pair<K1, U>, Pair<K2, V>> valuePair = input.second();
+            return Pair.of(Pair.of(valuePair.first().first(), valuePair.second().first()),
+                Pair.of(valuePair.first().second(), valuePair.second().second()));
+          }
+        },
+        ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()),
+            ctf.pairs(left.getValueType(), right.getValueType())));
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PCollection}s (using the
+   * same strategy as Pig's CROSS operator).
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross
+   *      Join</a>
+   * @param left
+   *          A PCollection to perform a cross join on.
+   * @param right
+   *          A PCollection to perform a cross join on.
+   * @param <U>
+   *          Type of the first {@link PCollection}'s values
+   * @param <V>
+   *          Type of the second {@link PCollection}'s values
+   * @return The joined result as tuples of (U,V).
+   */
+  public static <U, V> PCollection<Pair<U, V>> cross(PCollection<U> left, PCollection<V> right) {
+    return cross(left, right, DEFAULT_PARALLELISM);
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PCollection}s (using the
+   * same strategy as Pig's CROSS operator).
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross
+   *      Join</a>
+   * @param left
+   *          A PCollection to perform a cross join on.
+   * @param right
+   *          A PCollection to perform a cross join on.
+   * @param <U>
+   *          Type of the first {@link PCollection}'s values
+   * @param <V>
+   *          Type of the second {@link PCollection}'s values
+   * @return The joined result as tuples of (U,V).
+   */
+  public static <U, V> PCollection<Pair<U, V>> cross(PCollection<U> left, PCollection<V> right, int parallelism) {
+
+    PTypeFamily ltf = left.getTypeFamily();
+    PTypeFamily rtf = right.getTypeFamily();
+
+    PTableType<Pair<Integer, Integer>, U> ptt = ltf.tableOf(ltf.pairs(ltf.ints(), ltf.ints()), left.getPType());
+
+    if (ptt == null)
+      throw new Error();
+
+    PTable<Pair<Integer, Integer>, U> leftCross = left.parallelDo(new GFCross<U>(0, parallelism),
+        ltf.tableOf(ltf.pairs(ltf.ints(), ltf.ints()), left.getPType()));
+    PTable<Pair<Integer, Integer>, V> rightCross = right.parallelDo(new GFCross<V>(1, parallelism),
+        rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), right.getPType()));
+
+    PTable<Pair<Integer, Integer>, Pair<U, V>> cg = leftCross.join(rightCross);
+
+    PTypeFamily ctf = cg.getTypeFamily();
+
+    return cg.parallelDo(new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>, Pair<U, V>>() {
+      @Override
+      public Pair<U, V> map(Pair<Pair<Integer, Integer>, Pair<U, V>> input) {
+        return input.second();
+      }
+    }, ctf.pairs(left.getPType(), right.getPType()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
new file mode 100644
index 0000000..07d873c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import com.google.common.collect.Lists;
+
+public class Cogroup {
+
+  /**
+   * Co-groups the two {@link PTable} arguments.
+   * 
+   * @return a {@code PTable} representing the co-grouped tables.
+   */
+  public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PType<K> keyType = left.getPTableType().getKeyType();
+    PType<U> leftType = left.getPTableType().getValueType();
+    PType<V> rightType = right.getPTableType().getValueType();
+    PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType);
+
+    PTable<K, Pair<U, V>> cgLeft = left.parallelDo("coGroupTag1", new CogroupFn1<K, U, V>(),
+        ptf.tableOf(keyType, itype));
+    PTable<K, Pair<U, V>> cgRight = right.parallelDo("coGroupTag2", new CogroupFn2<K, U, V>(),
+        ptf.tableOf(keyType, itype));
+
+    PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
+
+    PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), ptf.collections(rightType));
+    return both.groupByKey().parallelDo("cogroup", 
+        new PostGroupFn<K, U, V>(leftType, rightType), ptf.tableOf(keyType, otype));
+  }
+
+  private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
+    @Override
+    public Pair<V, U> map(V v) {
+      return Pair.of(v, null);
+    }
+  }
+
+  private static class CogroupFn2<K, V, U> extends MapValuesFn<K, U, Pair<V, U>> {
+    @Override
+    public Pair<V, U> map(U u) {
+      return Pair.of(null, u);
+    }
+  }
+
+  private static class PostGroupFn<K, V, U> extends
+      DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+    
+    private PType<V> ptypeV;
+    private PType<U> ptypeU;
+    
+    public PostGroupFn(PType<V> ptypeV, PType<U> ptypeU) {
+      this.ptypeV = ptypeV;
+      this.ptypeU = ptypeU;
+    }
+    
+    @Override
+    public void initialize() {
+      super.initialize();
+      ptypeV.initialize(getConfiguration());
+      ptypeU.initialize(getConfiguration());
+    }
+    
+    @Override
+    public void process(Pair<K, Iterable<Pair<V, U>>> input,
+        Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {
+      Collection<V> cv = Lists.newArrayList();
+      Collection<U> cu = Lists.newArrayList();
+      for (Pair<V, U> pair : input.second()) {
+        if (pair.first() != null) {
+          cv.add(ptypeV.getDetachedValue(pair.first()));
+        } else if (pair.second() != null) {
+          cu.add(ptypeU.getDetachedValue(pair.second()));
+        }
+      }
+      emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java
new file mode 100644
index 0000000..994830d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Set;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * Functions for computing the distinct elements of a {@code PCollection}.
+ */
+public final class Distinct {
+
+  private static final int DEFAULT_FLUSH_EVERY = 50000;
+  
+  /**
+   * Construct a new {@code PCollection} that contains the unique elements of a
+   * given input {@code PCollection}.
+   * 
+   * @param input The input {@code PCollection}
+   * @return A new {@code PCollection} that contains the unique elements of the input
+   */
+  public static <S> PCollection<S> distinct(PCollection<S> input) {
+    return distinct(input, DEFAULT_FLUSH_EVERY);
+  }
+  
+  /**
+   * A {@code PTable<K, V>} analogue of the {@code distinct} function.
+   */
+  public static <K, V> PTable<K, V> distinct(PTable<K, V> input) {
+    return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input));
+  }
+  
+  /**
+   * A {@code distinct} operation that gives the client more control over how frequently
+   * elements are flushed to disk in order to allow control over performance or
+   * memory consumption.
+   * 
+   * @param input The input {@code PCollection}
+   * @param flushEvery Flush the elements to disk whenever we encounter this many unique values
+   * @return A new {@code PCollection} that contains the unique elements of the input
+   */
+  public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) {
+    Preconditions.checkArgument(flushEvery > 0);
+    PType<S> pt = input.getPType();
+    PTypeFamily ptf = pt.getFamily();
+    return input
+        .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery, pt), ptf.tableOf(pt, ptf.nulls()))
+        .groupByKey()
+        .parallelDo("post-distinct", new PostDistinctFn<S>(), pt);
+  }
+  
+  /**
+   * A {@code PTable<K, V>} analogue of the {@code distinct} function.
+   */
+  public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) {
+    return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery));
+  }
+  
+  private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> {
+    private final Set<S> values = Sets.newHashSet();
+    private final int flushEvery;
+    private final PType<S> ptype;
+    
+    public PreDistinctFn(int flushEvery, PType<S> ptype) {
+      this.flushEvery = flushEvery;
+      this.ptype = ptype;
+    }
+    
+    @Override
+    public void initialize() {
+      super.initialize();
+      ptype.initialize(getConfiguration());
+    }
+    
+    @Override
+    public void process(S input, Emitter<Pair<S, Void>> emitter) {
+      values.add(ptype.getDetachedValue(input));
+      if (values.size() > flushEvery) {
+        cleanup(emitter);
+      }
+    }
+    
+    @Override
+    public void cleanup(Emitter<Pair<S, Void>> emitter) {
+      for (S in : values) {
+        emitter.emit(Pair.<S, Void>of(in, null));
+      }
+      values.clear();
+    }
+  }
+  
+  private static class PostDistinctFn<S> extends DoFn<Pair<S, Iterable<Void>>, S> {
+    @Override
+    public void process(Pair<S, Iterable<Void>> input, Emitter<S> emitter) {
+      emitter.emit(input.first());
+    }
+  }
+  
+  // No instantiation
+  private Distinct() {}
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Join.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Join.java b/crunch-core/src/main/java/org/apache/crunch/lib/Join.java
new file mode 100644
index 0000000..c0c4a6b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Join.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.join.FullOuterJoinFn;
+import org.apache.crunch.lib.join.InnerJoinFn;
+import org.apache.crunch.lib.join.JoinFn;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.lib.join.LeftOuterJoinFn;
+import org.apache.crunch.lib.join.RightOuterJoinFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for joining multiple {@code PTable} instances based on a common
+ * lastKey.
+ */
+public class Join {
+  /**
+   * Performs an inner join on the specified {@link PTable}s.
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner
+   *      Join</a>
+   * @param left
+   *          A PTable to perform an inner join on.
+   * @param right
+   *          A PTable to perform an inner join on.
+   * @param <K>
+   *          Type of the keys.
+   * @param <U>
+   *          Type of the first {@link PTable}'s values
+   * @param <V>
+   *          Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
+    return innerJoin(left, right);
+  }
+
+  /**
+   * Performs an inner join on the specified {@link PTable}s.
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner
+   *      Join</a>
+   * @param left
+   *          A PTable to perform an inner join on.
+   * @param right
+   *          A PTable to perform an inner join on.
+   * @param <K>
+   *          Type of the keys.
+   * @param <U>
+   *          Type of the first {@link PTable}'s values
+   * @param <V>
+   *          Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new InnerJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+  }
+
+  /**
+   * Performs a left outer join on the specified {@link PTable}s.
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Left_outer_join">Left
+   *      Join</a>
+   * @param left
+   *          A PTable to perform an left join on. All of this PTable's entries
+   *          will appear in the resulting PTable.
+   * @param right
+   *          A PTable to perform an left join on.
+   * @param <K>
+   *          Type of the keys.
+   * @param <U>
+   *          Type of the first {@link PTable}'s values
+   * @param <V>
+   *          Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+  }
+
+  /**
+   * Performs a right outer join on the specified {@link PTable}s.
+   * 
+   * @see <a
+   *      href="http://en.wikipedia.org/wiki/Join_(SQL)#Right_outer_join">Right
+   *      Join</a>
+   * @param left
+   *          A PTable to perform an right join on.
+   * @param right
+   *          A PTable to perform an right join on. All of this PTable's entries
+   *          will appear in the resulting PTable.
+   * @param <K>
+   *          Type of the keys.
+   * @param <U>
+   *          Type of the first {@link PTable}'s values
+   * @param <V>
+   *          Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new RightOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+  }
+
+  /**
+   * Performs a full outer join on the specified {@link PTable}s.
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Full_outer_join">Full
+   *      Join</a>
+   * @param left
+   *          A PTable to perform an full join on.
+   * @param right
+   *          A PTable to perform an full join on.
+   * @param <K>
+   *          Type of the keys.
+   * @param <U>
+   *          Type of the first {@link PTable}'s values
+   * @param <V>
+   *          Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new FullOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+  }
+
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinFn<K, U, V> joinFn) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right);
+    PTableType<K, Pair<U, V>> ret = ptf
+        .tableOf(left.getKeyType(), ptf.pairs(left.getValueType(), right.getValueType()));
+
+    return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret);
+  }
+
+  private static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
+        ptf.pairs(left.getValueType(), right.getValueType()));
+
+    PTable<Pair<K, Integer>, Pair<U, V>> tag1 = left.parallelDo("joinTagLeft",
+        new MapFn<Pair<K, U>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+          @Override
+          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
+            return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
+          }
+        }, ptt);
+    PTable<Pair<K, Integer>, Pair<U, V>> tag2 = right.parallelDo("joinTagRight",
+        new MapFn<Pair<K, V>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+          @Override
+          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
+            return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
+          }
+        }, ptt);
+
+    GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
+    optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
+
+    return (tag1.union(tag2)).groupByKey(optionsBuilder.build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
new file mode 100644
index 0000000..e907680
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Methods for performing common operations on PTables.
+ * 
+ */
+public class PTables {
+
+  /**
+   * Convert the given {@code PCollection<Pair<K, V>>} to a {@code PTable<K, V>}.
+   * @param pcollect The {@code PCollection} to convert
+   * @return A {@code PTable} that contains the same data as the input {@code PCollection}
+   */
+  public static <K, V> PTable<K, V> asPTable(PCollection<Pair<K, V>> pcollect) {
+    PType<Pair<K, V>> pt = pcollect.getPType();
+    PTypeFamily ptf = pt.getFamily();
+    PTableType<K, V> ptt = ptf.tableOf(pt.getSubTypes().get(0), pt.getSubTypes().get(1));
+    DoFn<Pair<K, V>, Pair<K, V>> id = IdentityFn.getInstance();
+    return pcollect.parallelDo("asPTable", id, ptt);
+  }
+  
+  /**
+   * Extract the keys from the given {@code PTable<K, V>} as a {@code PCollection<K>}.
+   * @param ptable The {@code PTable}
+   * @return A {@code PCollection<K>}
+   */
+  public static <K, V> PCollection<K> keys(PTable<K, V> ptable) {
+    return ptable.parallelDo("PTables.keys", new DoFn<Pair<K, V>, K>() {
+      @Override
+      public void process(Pair<K, V> input, Emitter<K> emitter) {
+        emitter.emit(input.first());
+      }
+    }, ptable.getKeyType());
+  }
+
+  /**
+   * Extract the values from the given {@code PTable<K, V>} as a {@code PCollection<V>}.
+   * @param ptable The {@code PTable}
+   * @return A {@code PCollection<V>}
+   */
+  public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
+    return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
+      @Override
+      public void process(Pair<K, V> input, Emitter<V> emitter) {
+        emitter.emit(input.second());
+      }
+    }, ptable.getValueType());
+  }
+
+  /**
+   * Create a detached value for a table {@link Pair}.
+   * 
+   * @param tableType The table type
+   * @param value The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, V> getDetachedValue(PTableType<K, V> tableType, Pair<K, V> value) {
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType()
+        .getDetachedValue(value.second()));
+  }
+
+  /**
+   * Created a detached value for a {@link PGroupedTable} value.
+   * 
+   * 
+   * @param groupedTableType The grouped table type
+   * @param value The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue(
+      PGroupedTableType<K, V> groupedTableType, Pair<K, Iterable<V>> value) {
+
+    PTableType<K, V> tableType = groupedTableType.getTableType();
+    List<V> detachedIterable = Lists.newArrayList();
+    PType<V> valueType = tableType.getValueType();
+    for (V v : value.second()) {
+      detachedIterable.add(valueType.getDetachedValue(v));
+    }
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
+        (Iterable<V>) detachedIterable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java
new file mode 100644
index 0000000..5a66101
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.SampleUtils.ReservoirSampleFn;
+import org.apache.crunch.lib.SampleUtils.SampleFn;
+import org.apache.crunch.lib.SampleUtils.WRSCombineFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Methods for performing random sampling in a distributed fashion, either by accepting each
+ * record in a {@code PCollection} with an independent probability in order to sample some
+ * fraction of the overall data set, or by using reservoir sampling in order to pull a uniform
+ * or weighted sample of fixed size from a {@code PCollection} of an unknown size. For more details
+ * on the reservoir sampling algorithms used by this library, see the A-ES algorithm described in
+ * <a href="http://arxiv.org/pdf/1012.0256.pdf">Efraimidis (2012)</a>.
+ */
+public class Sample {
+
+  /**
+   * Output records from the given {@code PCollection} with the given probability.
+   * 
+   * @param input The {@code PCollection} to sample from
+   * @param probability The probability (0.0 &lt; p %lt; 1.0)
+   * @return The output {@code PCollection} created from sampling
+   */
+  public static <S> PCollection<S> sample(PCollection<S> input, double probability) {
+    return sample(input, null, probability);
+  }
+
+  /**
+   * Output records from the given {@code PCollection} using a given seed. Useful for unit
+   * testing.
+   * 
+   * @param input The {@code PCollection} to sample from
+   * @param seed The seed for the random number generator
+   * @param probability The probability (0.0 &lt; p &lt; 1.0)
+   * @return The output {@code PCollection} created from sampling
+   */
+  public static <S> PCollection<S> sample(PCollection<S> input, Long seed, double probability) {
+    String stageName = String.format("sample(%.2f)", probability);
+    return input.parallelDo(stageName, new SampleFn<S>(probability, seed), input.getPType());
+  }
+  
+  /**
+   * A {@code PTable<K, V>} analogue of the {@code sample} function.
+   * 
+   * @param input The {@code PTable} to sample from
+   * @param probability The probability (0.0 &lt; p &lt; 1.0)
+   * @return The output {@code PTable} created from sampling
+   */
+  public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability) {
+    return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, probability));
+  }
+  
+  /**
+   * A {@code PTable<K, V>} analogue of the {@code sample} function, with the seed argument
+   * exposed for testing purposes.
+   * 
+   * @param input The {@code PTable} to sample from
+   * @param seed The seed for the random number generator
+   * @param probability The probability (0.0 &lt; p &lt; 1.0)
+   * @return The output {@code PTable} created from sampling
+   */
+  public static <K, V> PTable<K, V> sample(PTable<K, V> input, Long seed, double probability) {
+    return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, seed, probability));
+  }
+  
+  /**
+   * Select a fixed number of elements from the given {@code PCollection} with each element
+   * equally likely to be included in the sample.
+   * 
+   * @param input The input data
+   * @param sampleSize The number of elements to select
+   * @return A {@code PCollection} made up of the sampled elements
+   */
+  public static <T> PCollection<T> reservoirSample(
+      PCollection<T> input,
+      int sampleSize) {
+    return reservorSample(input, sampleSize, null);
+  }
+
+  /**
+   * A version of the reservoir sampling algorithm that uses a given seed, primarily for
+   * testing purposes.
+   * 
+   * @param input The input data
+   * @param sampleSize The number of elements to select
+   * @param seed The test seed
+   * @return A {@code PCollection} made up of the sampled elements
+
+   */
+  public static <T> PCollection<T> reservorSample(
+      PCollection<T> input,
+      int sampleSize,
+      Long seed) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PType<Pair<T, Integer>> ptype = ptf.pairs(input.getPType(), ptf.ints());
+    return weightedReservoirSample(
+        input.parallelDo(new MapFn<T, Pair<T, Integer>>() {
+          public Pair<T, Integer> map(T t) { return Pair.of(t, 1); }
+        }, ptype),
+        sampleSize,
+        seed);
+  }
+  
+  /**
+   * Selects a weighted sample of the elements of the given {@code PCollection}, where the second term in
+   * the input {@code Pair} is a numerical weight.
+   * 
+   * @param input the weighted observations
+   * @param sampleSize The number of elements to select
+   * @return A random sample of the given size that respects the weighting values
+   */
+  public static <T, N extends Number> PCollection<T> weightedReservoirSample(
+      PCollection<Pair<T, N>> input,
+      int sampleSize) {
+    return weightedReservoirSample(input, sampleSize, null);
+  }
+  
+  /**
+   * The weighted reservoir sampling function with the seed term exposed for testing purposes.
+   * 
+   * @param input the weighted observations
+   * @param sampleSize The number of elements to select
+   * @param seed The test seed
+   * @return A random sample of the given size that respects the weighting values
+   */
+  public static <T, N extends Number> PCollection<T> weightedReservoirSample(
+      PCollection<Pair<T, N>> input,
+      int sampleSize,
+      Long seed) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PTable<Integer, Pair<T, N>> groupedIn = input.parallelDo(
+        new MapFn<Pair<T, N>, Pair<Integer, Pair<T, N>>>() {
+          @Override
+          public Pair<Integer, Pair<T, N>> map(Pair<T, N> p) {
+            return Pair.of(0, p);
+          }
+        }, ptf.tableOf(ptf.ints(), input.getPType()));
+    int[] ss = new int[] { sampleSize };
+    return groupedWeightedReservoirSample(groupedIn, ss, seed)
+        .parallelDo(new MapFn<Pair<Integer, T>, T>() {
+          @Override
+          public T map(Pair<Integer, T> p) {
+            return p.second();
+          }
+        }, (PType<T>) input.getPType().getSubTypes().get(0));
+  }
+  
+  /**
+   * The most general purpose of the weighted reservoir sampling patterns that allows us to choose
+   * a random sample of elements for each of N input groups.
+   * 
+   * @param input A {@code PTable} with the key a group ID and the value a weighted observation in that group
+   * @param sampleSizes An array of length N, with each entry is the number of elements to include in that group
+   * @return A {@code PCollection} of the sampled elements for each of the groups
+   */
+  
+  public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample(
+      PTable<Integer, Pair<T, N>> input,
+      int[] sampleSizes) {
+    return groupedWeightedReservoirSample(input, sampleSizes, null);
+  }
+  
+  /**
+   * Same as the other groupedWeightedReservoirSample method, but include a seed for testing
+   * purposes.
+   * 
+   * @param input A {@code PTable} with the key a group ID and the value a weighted observation in that group
+   * @param sampleSizes An array of length N, with each entry is the number of elements to include in that group
+   * @param seed The test seed
+   * @return A {@code PCollection} of the sampled elements for each of the groups
+   */
+  public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample(
+      PTable<Integer, Pair<T, N>> input,
+      int[] sampleSizes,
+      Long seed) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PType<T> ttype = (PType<T>) input.getPTableType().getValueType().getSubTypes().get(0);
+    PTableType<Integer, Pair<Double, T>> ptt = ptf.tableOf(ptf.ints(),
+        ptf.pairs(ptf.doubles(), ttype));
+    
+    return input.parallelDo(new ReservoirSampleFn<T, N>(sampleSizes, seed, ttype), ptt)
+        .groupByKey(1)
+        .combineValues(new WRSCombineFn<T>(sampleSizes, ttype))
+        .parallelDo(new MapFn<Pair<Integer, Pair<Double, T>>, Pair<Integer, T>>() {
+          @Override
+          public Pair<Integer, T> map(Pair<Integer, Pair<Double, T>> p) {
+            return Pair.of(p.first(), p.second().second());
+          }
+        }, ptf.pairs(ptf.ints(), ttype));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/SampleUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/SampleUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/SampleUtils.java
new file mode 100644
index 0000000..8769eed
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/SampleUtils.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedMap;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+class SampleUtils {
+  
+  static class SampleFn<S> extends FilterFn<S> {
+
+    private final Long seed;
+    private final double acceptanceProbability;
+    private transient Random r;
+
+    public SampleFn(double acceptanceProbability, Long seed) {
+      Preconditions.checkArgument(0.0 < acceptanceProbability && acceptanceProbability < 1.0);
+      this.seed = seed == null ? System.currentTimeMillis() : seed;
+      this.acceptanceProbability = acceptanceProbability;
+    }
+
+    @Override
+    public void initialize() {
+      if (r == null) {
+        r = new Random(seed);
+      }
+    }
+
+    @Override
+    public boolean accept(S input) {
+      return r.nextDouble() < acceptanceProbability;
+    }
+  }
+
+
+  static class ReservoirSampleFn<T, N extends Number>
+      extends DoFn<Pair<Integer, Pair<T, N>>, Pair<Integer, Pair<Double, T>>> {
+  
+    private int[] sampleSizes;
+    private Long seed;
+    private PType<T> valueType;
+    private transient List<SortedMap<Double, T>> reservoirs;
+    private transient Random random;
+    
+    public ReservoirSampleFn(int[] sampleSizes, Long seed, PType<T> valueType) {
+      this.sampleSizes = sampleSizes;
+      this.seed = seed;
+      this.valueType = valueType;
+    }
+    
+    @Override
+    public void initialize() {
+      this.reservoirs = Lists.newArrayList();
+      this.valueType.initialize(getConfiguration());
+      for (int i = 0; i < sampleSizes.length; i++) {
+        reservoirs.add(Maps.<Double, T>newTreeMap());
+      }
+      if (random == null) {
+        if (seed == null) {
+          this.random = new Random();
+        } else {
+          this.random = new Random(seed);
+        }
+      }
+    }
+    
+    @Override
+    public void process(Pair<Integer, Pair<T, N>> input,
+        Emitter<Pair<Integer, Pair<Double, T>>> emitter) {
+      int id = input.first();
+      Pair<T, N> p = input.second();
+      double weight = p.second().doubleValue();
+      if (weight > 0.0) {
+        double score = Math.log(random.nextDouble()) / weight;
+        SortedMap<Double, T> reservoir = reservoirs.get(id);
+        if (reservoir.size() < sampleSizes[id]) { 
+          reservoir.put(score, valueType.getDetachedValue(p.first()));        
+        } else if (score > reservoir.firstKey()) {
+          reservoir.remove(reservoir.firstKey());
+          reservoir.put(score, valueType.getDetachedValue(p.first()));
+        }
+      }
+    }
+    
+    @Override
+    public void cleanup(Emitter<Pair<Integer, Pair<Double, T>>> emitter) {
+      for (int id = 0; id < reservoirs.size(); id++) {
+        SortedMap<Double, T> reservoir = reservoirs.get(id);
+        for (Map.Entry<Double, T> e : reservoir.entrySet()) {
+          emitter.emit(Pair.of(id, Pair.of(e.getKey(), e.getValue())));
+        }
+      }
+    }
+  }
+  
+  static class WRSCombineFn<T> extends CombineFn<Integer, Pair<Double, T>> {
+
+    private int[] sampleSizes;
+    private PType<T> valueType;
+    private List<SortedMap<Double, T>> reservoirs;
+    
+    public WRSCombineFn(int[] sampleSizes, PType<T> valueType) {
+      this.sampleSizes = sampleSizes;
+      this.valueType = valueType;
+    }
+
+    @Override
+    public void initialize() {
+      this.reservoirs = Lists.newArrayList();
+      for (int i = 0; i < sampleSizes.length; i++) {
+        reservoirs.add(Maps.<Double, T>newTreeMap());
+      }
+      this.valueType.initialize(getConfiguration());
+    }
+    
+    @Override
+    public void process(Pair<Integer, Iterable<Pair<Double, T>>> input,
+        Emitter<Pair<Integer, Pair<Double, T>>> emitter) {
+      SortedMap<Double, T> reservoir = reservoirs.get(input.first());
+      for (Pair<Double, T> p : input.second()) {
+        if (reservoir.size() < sampleSizes[input.first()]) { 
+          reservoir.put(p.first(), valueType.getDetachedValue(p.second()));        
+        } else if (p.first() > reservoir.firstKey()) {
+          reservoir.remove(reservoir.firstKey());
+          reservoir.put(p.first(), valueType.getDetachedValue(p.second()));  
+        }
+      }
+    }
+    
+    @Override
+    public void cleanup(Emitter<Pair<Integer, Pair<Double, T>>> emitter) {
+      for (int i = 0; i < reservoirs.size(); i++) {
+        SortedMap<Double, T> reservoir = reservoirs.get(i);
+        for (Map.Entry<Double, T> e : reservoir.entrySet()) {
+          emitter.emit(Pair.of(i, Pair.of(e.getKey(), e.getValue())));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
new file mode 100644
index 0000000..54b4396
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utilities for performing a secondary sort on a {@code PTable<K, Pair<V1, V2>>} collection.
+ * <p>
+ * Secondary sorts are usually performed during sessionization: given a collection
+ * of events, we want to group them by a key (such as a user ID), then sort the grouped
+ * records by an auxillary key (such as a timestamp), and then perform some additional
+ * processing on the sorted records.
+ */
+public class SecondarySort {
+  
+  /**
+   * Perform a secondary sort on the given {@code PTable} instance and then apply a
+   * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>}.
+   */
+  public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) {
+    return prepare(input)
+        .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
+  }
+  
+  /**
+   * Perform a secondary sort on the given {@code PTable} instance and then apply a
+   * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>}.
+   */
+  public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) {
+    return prepare(input)
+        .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, Pair<U, V>>(doFn), ptype);
+  }
+  
+  private static <K, V1, V2> PGroupedTable<Pair<K, V1>, Pair<V1, V2>> prepare(
+      PTable<K, Pair<V1, V2>> input) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PType<Pair<V1, V2>> valueType = input.getValueType();
+    PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
+        ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)),
+        valueType);
+    PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
+        ptf.collections(input.getValueType()));
+    return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
+        .groupByKey(
+            GroupingOptions.builder()
+            .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
+            .partitionerClass(JoinUtils.getPartitionerClass(ptf))
+            .build());
+  }
+  
+  private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
+    @Override
+    public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
+      return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
+    }
+  }  
+
+  private static class SSWrapFn<K, V1, V2, T> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> {
+    private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern;
+    
+    public SSWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern) {
+      this.intern = intern;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      intern.configure(conf);
+    }
+
+    @Override
+    public void initialize() {
+      intern.setContext(getContext());
+      intern.initialize();
+    }
+    
+    @Override
+    public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<T> emitter) {
+      intern.process(Pair.of(input.first().first(), input.second()), emitter);
+    }
+    
+    @Override
+    public void cleanup(Emitter<T> emitter) {
+      intern.cleanup(emitter);
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Set.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Set.java b/crunch-core/src/main/java/org/apache/crunch/lib/Set.java
new file mode 100644
index 0000000..0ba879c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Set.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for performing set operations (difference, intersection, etc) on
+ * {@code PCollection} instances.
+ */
+public class Set {
+
+  /**
+   * Compute the set difference between two sets of elements.
+   * 
+   * @return a collection containing elements that are in <code>coll1</code> but
+   *         not in <code>coll2</code>
+   */
+  public static <T> PCollection<T> difference(PCollection<T> coll1, PCollection<T> coll2) {
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo(
+        new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input, Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
+  }
+
+  /**
+   * Compute the intersection of two sets of elements.
+   * 
+   * @return a collection containing elements that common to both sets
+   *         <code>coll1</code> and <code>coll2</code>
+   */
+  public static <T> PCollection<T> intersection(PCollection<T> coll1, PCollection<T> coll2) {
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo(
+        new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input, Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && !groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
+  }
+
+  /**
+   * Find the elements that are common to two sets, like the Unix
+   * <code>comm</code> utility. This method returns a {@link PCollection} of
+   * {@link Tuple3} objects, and the position in the tuple that an element
+   * appears is determined by the collections that it is a member of, as
+   * follows:
+   * <ol>
+   * <li>elements only in <code>coll1</code>,</li>
+   * <li>elements only in <code>coll2</code>, or</li>
+   * <li>elements in both collections</li>
+   * </ol>
+   * Tuples are otherwise filled with <code>null</code>.
+   * 
+   * @return a collection of {@link Tuple3} objects
+   */
+  public static <T> PCollection<Tuple3<T, T, T>> comm(PCollection<T> coll1, PCollection<T> coll2) {
+    PTypeFamily typeFamily = coll1.getTypeFamily();
+    PType<T> type = coll1.getPType();
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo(
+        new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, Tuple3<T, T, T>>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<Tuple3<T, T, T>> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            boolean inFirst = !groups.first().isEmpty();
+            boolean inSecond = !groups.second().isEmpty();
+            T t = input.first();
+            emitter.emit(Tuple3.of(inFirst && !inSecond ? t : null, !inFirst && inSecond ? t : null, inFirst
+                && inSecond ? t : null));
+          }
+        }, typeFamily.triples(type, type, type));
+  }
+
+  private static <T> PTable<T, Boolean> toTable(PCollection<T> coll) {
+    PTypeFamily typeFamily = coll.getTypeFamily();
+    return coll.parallelDo(new DoFn<T, Pair<T, Boolean>>() {
+      @Override
+      public void process(T input, Emitter<Pair<T, Boolean>> emitter) {
+        emitter.emit(Pair.of(input, Boolean.TRUE));
+      }
+    }, typeFamily.tableOf(coll.getPType(), typeFamily.booleans()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
new file mode 100644
index 0000000..23bcaee
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.lib.sort.Comparators.*;
+import static org.apache.crunch.lib.sort.SortFns.*;
+
+import org.apache.avro.Schema;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.GroupingOptions.Builder;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.lib.sort.TotalOrderPartitioner;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.util.PartitionUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utilities for sorting {@code PCollection} instances.
+ */
+public class Sort {
+
+  /**
+   * For signaling the order in which a sort should be done.
+   */
+  public enum Order {
+    ASCENDING,
+    DESCENDING,
+    IGNORE
+  }
+
+  /**
+   * To sort by column 2 ascending then column 1 descending, you would use:
+   * <code>
+   * sortPairs(coll, by(2, ASCENDING), by(1, DESCENDING))
+   * </code> Column numbering is 1-based.
+   */
+  public static class ColumnOrder {
+    private int column;
+    private Order order;
+
+    public ColumnOrder(int column, Order order) {
+      this.column = column;
+      this.order = order;
+    }
+
+    public static ColumnOrder by(int column, Order order) {
+      return new ColumnOrder(column, order);
+    }
+
+    public int column() {
+      return column;
+    }
+    
+    public Order order() {
+      return order;
+    }
+    
+    @Override
+    public String toString() {
+      return "ColumnOrder: column:" + column + ", Order: " + order;
+    }
+  }
+
+  /**
+   * Sorts the {@code PCollection} using the natural ordering of its elements in ascending order.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection) {
+    return sort(collection, Order.ASCENDING);
+  }
+
+  /**
+   * Sorts the {@code PCollection} using the natural order of its elements with the given {@code Order}.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection, Order order) {
+    return sort(collection, -1, order);
+  }
+  
+  /**
+   * Sorts the {@code PCollection} using the natural ordering of its elements in
+   * the order specified using the given number of reducers.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PTableType<T, Void> type = tf.tableOf(collection.getPType(), tf.nulls());
+    Configuration conf = collection.getPipeline().getConfiguration();
+    PTable<T, Void> pt = collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
+      @Override
+      public void process(T input, Emitter<Pair<T, Void>> emitter) {
+        emitter.emit(Pair.of(input, (Void) null));
+      }
+    }, type);
+    GroupingOptions options = buildGroupingOptions(pt, conf, numReducers, order);
+    return pt.groupByKey(options).ungroup().keys();
+  }
+
+  /**
+   * Sorts the {@code PTable} using the natural ordering of its keys in ascending order.
+   * 
+   * @return a {@code PTable} representing the sorted table.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table) {
+    return sort(table, Order.ASCENDING);
+  }
+
+  /**
+   * Sorts the {@code PTable} using the natural ordering of its keys with the given {@code Order}.
+   *
+   * @return a {@code PTable} representing the sorted table.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) {
+    return sort(table, -1, key);
+  }
+  
+  /**
+   * Sorts the {@code PTable} using the natural ordering of its keys in the
+   * order specified with a client-specified number of reducers.
+   * 
+   * @return a {@code PTable} representing the sorted collection.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key) {
+    Configuration conf = table.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(table, conf, numReducers, key);
+    return table.groupByKey(options).ungroup();
+  }
+
+  
+  /**
+   * Sorts the {@code PCollection} of {@code Pair}s using the specified column
+   * ordering.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection,
+      ColumnOrder... columnOrders) {
+    return sortTuples(collection, columnOrders);
+  }
+
+  /**
+   * Sorts the {@code PCollection} of {@code Tuple3}s using the specified column
+   * ordering.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection,
+      ColumnOrder... columnOrders) {
+    return sortTuples(collection, columnOrders);
+  }
+
+  /**
+   * Sorts the {@code PCollection} of {@code Tuple4}s using the specified column
+   * ordering.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
+      PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
+    return sortTuples(collection, columnOrders);
+  }
+
+  /**
+   * Sorts the {@code PCollection} of tuples using the specified column ordering.
+   *
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection,
+      ColumnOrder... columnOrders) {
+    return sortTuples(collection, -1, columnOrders);
+  }
+  
+  /**
+   * Sorts the {@code PCollection} of {@link TupleN}s using the specified column
+   * ordering and a client-specified number of reducers.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, int numReducers,
+      ColumnOrder... columnOrders) {
+    PType<T> pType = collection.getPType();
+    KeyExtraction<T> ke = new KeyExtraction<T>(pType, columnOrders);
+    PTable<Object, T> pt = collection.by(ke.getByFn(), ke.getKeyType());
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(pt, conf, numReducers, columnOrders);
+    return pt.groupByKey(options).ungroup().values();
+  }
+
+  // TODO: move to type family?
+  private static <K, V> GroupingOptions buildGroupingOptions(PTable<K, V> ptable, Configuration conf,
+      int numReducers, Order order) {
+    PType<K> ptype = ptable.getKeyType();
+    PTypeFamily tf = ptable.getTypeFamily();
+    Builder builder = GroupingOptions.builder();
+    if (order == Order.DESCENDING) {
+      if (tf == WritableTypeFamily.getInstance()) {
+        builder.sortComparatorClass(ReverseWritableComparator.class);
+      } else if (tf == AvroTypeFamily.getInstance()) {
+        AvroType<K> avroType = (AvroType<K>) ptype;
+        Schema schema = avroType.getSchema();
+        builder.conf("crunch.schema", schema.toString());
+        builder.sortComparatorClass(ReverseAvroComparator.class);
+      } else {
+        throw new RuntimeException("Unrecognized type family: " + tf);
+      }
+    } else if (tf == AvroTypeFamily.getInstance()) {
+      builder.conf("crunch.schema", ((AvroType<K>) ptype).getSchema().toString());
+    }
+    configureReducers(builder, ptable, conf, numReducers);
+    return builder.build();
+  }
+
+  private static <K, V> GroupingOptions buildGroupingOptions(PTable<K, V> ptable, Configuration conf,
+      int numReducers, ColumnOrder[] columnOrders) {
+    PTypeFamily tf = ptable.getTypeFamily();
+    PType<K> keyType = ptable.getKeyType();
+    Builder builder = GroupingOptions.builder();
+    if (tf == WritableTypeFamily.getInstance()) {
+      if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
+        builder.sortComparatorClass(ReverseWritableComparator.class);
+      } else {
+        TupleWritableComparator.configureOrdering(conf, columnOrders);
+        builder.sortComparatorClass(TupleWritableComparator.class);
+      }
+    } else if (tf == AvroTypeFamily.getInstance()) {
+      AvroType<K> avroType = (AvroType<K>) keyType;
+      Schema schema = avroType.getSchema();
+      builder.conf("crunch.schema", schema.toString());
+      if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
+        builder.sortComparatorClass(ReverseAvroComparator.class);
+      }
+    } else {
+      throw new RuntimeException("Unrecognized type family: " + tf);
+    }
+    configureReducers(builder, ptable, conf, numReducers);
+    return builder.build();
+  }
+
+  private static <K, V> void configureReducers(GroupingOptions.Builder builder,
+      PTable<K, V> ptable, Configuration conf, int numReducers) {
+    if (numReducers <= 0) {
+      numReducers = PartitionUtils.getRecommendedPartitions(ptable, conf);
+      if (numReducers < 5) {
+        // Not worth the overhead, force it to 1
+        numReducers = 1;
+      }
+    }
+    builder.numReducers(numReducers);
+    if (numReducers > 1) {
+      Iterable<K> iter = Sample.reservoirSample(ptable.keys(), numReducers - 1).materialize();
+      MaterializableIterable<K> mi = (MaterializableIterable<K>) iter;
+      if (mi.isSourceTarget()) {
+        builder.sourceTarget((SourceTarget) mi.getSource());
+      }
+      builder.partitionerClass(TotalOrderPartitioner.class);
+      builder.conf(TotalOrderPartitioner.PARTITIONER_PATH, mi.getPath().toString());
+      //TODO: distcache handling
+    }   
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
new file mode 100644
index 0000000..c0ce727
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an full outer join.
+ * 
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+  private transient int lastId;
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public FullOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    super.initialize();
+    lastId = 1;
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      // Make sure that left side gets emitted.
+      if (0 == lastId) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+        }
+      }
+      lastKey = keyType.getDetachedValue(key);
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        // Make sure that right side gets emitted.
+        if (leftValues.isEmpty()) {
+          leftValues.add(null);
+        }
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+
+    lastId = id;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (0 == lastId) {
+      for (U u : leftValues) {
+        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() {
+    return "fullOuterJoin";
+  }
+}