You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/03/24 22:43:06 UTC

git commit: CRUNCH-51: Working total sort implementation w/example instance for testing the impl.

Updated Branches:
  refs/heads/master 5d30af0c0 -> e20cbf089


CRUNCH-51: Working total sort implementation w/example instance for testing the impl.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e20cbf08
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e20cbf08
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e20cbf08

Branch: refs/heads/master
Commit: e20cbf089ddf5c7992c7d5f76f050bea8d9eaecc
Parents: 5d30af0
Author: Josh Wills <jw...@apache.org>
Authored: Wed Mar 13 00:45:07 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Mar 24 13:32:36 2013 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/examples/SortExample.java    |   53 ++
 .../java/org/apache/crunch/GroupingOptions.java    |   41 ++-
 .../crunch/impl/mem/collect/MemCollection.java     |    2 +-
 .../crunch/impl/mem/collect/MemGroupedTable.java   |    2 +-
 .../crunch/impl/mr/collect/PGroupedTableImpl.java  |   17 +-
 .../crunch/io/avro/AvroFileReaderFactory.java      |   11 +-
 .../apache/crunch/io/seq/SeqFileReaderFactory.java |   13 +-
 .../src/main/java/org/apache/crunch/lib/Sort.java  |  460 ++++-----------
 .../org/apache/crunch/lib/join/MapsideJoin.java    |   30 +-
 .../org/apache/crunch/lib/sort/Comparators.java    |  187 ++++++
 .../java/org/apache/crunch/lib/sort/SortFns.java   |  210 +++++++
 .../crunch/lib/sort/TotalOrderPartitioner.java     |  145 +++++
 .../org/apache/crunch/types/PGroupedTableType.java |    4 +-
 .../main/java/org/apache/crunch/types/PType.java   |    4 +-
 .../org/apache/crunch/types/avro/AvroType.java     |    4 +-
 .../crunch/types/writable/WritableTableType.java   |    4 +-
 .../apache/crunch/types/writable/WritableType.java |    4 +-
 .../java/org/apache/crunch/util/DistCache.java     |   23 +-
 .../org/apache/crunch/util/PartitionUtils.java     |   34 ++
 19 files changed, 850 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java
new file mode 100644
index 0000000..a7cd8b7
--- /dev/null
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.lib.Sort;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.util.CrunchTool;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Simple Crunch tool for running sorting examples from the command line.
+ */
+public class SortExample extends CrunchTool {
+
+  public SortExample() {
+    super();
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 3) {
+      System.err.println("Usage: <input-path> <output-path> <num-reducers>");
+      return 1;
+    }
+    
+    PCollection<String> in = readTextFile(args[0]);
+    writeTextFile(Sort.sort(in, Integer.valueOf(args[2]), Order.ASCENDING), args[1]);
+    done();
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new SortExample(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
index ea2d6c6..4aa1343 100644
--- a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
+++ b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -17,10 +17,16 @@
  */
 package org.apache.crunch;
 
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
  * Options that can be passed to a {@code groupByKey} operation in order to
  * exercise finer control over how the partitioning, grouping, and sorting of
@@ -33,14 +39,18 @@ public class GroupingOptions {
   private final Class<? extends RawComparator> groupingComparatorClass;
   private final Class<? extends RawComparator> sortComparatorClass;
   private final int numReducers;
-
+  private final Map<String, String> extraConf;
+  private final Set<SourceTarget<?>> sourceTargets;
+  
   private GroupingOptions(Class<? extends Partitioner> partitionerClass,
       Class<? extends RawComparator> groupingComparatorClass, Class<? extends RawComparator> sortComparatorClass,
-      int numReducers) {
+      int numReducers, Map<String, String> extraConf, Set<SourceTarget<?>> sourceTargets) {
     this.partitionerClass = partitionerClass;
     this.groupingComparatorClass = groupingComparatorClass;
     this.sortComparatorClass = sortComparatorClass;
     this.numReducers = numReducers;
+    this.extraConf = extraConf;
+    this.sourceTargets = sourceTargets;
   }
 
   public int getNumReducers() {
@@ -59,6 +69,10 @@ public class GroupingOptions {
     return partitionerClass;
   }
   
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return sourceTargets;
+  }
+  
   public void configure(Job job) {
     if (partitionerClass != null) {
       job.setPartitionerClass(partitionerClass);
@@ -72,6 +86,9 @@ public class GroupingOptions {
     if (numReducers > 0) {
       job.setNumReduceTasks(numReducers);
     }
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      job.getConfiguration().set(e.getKey(), e.getValue());
+    }
   }
 
   public boolean isCompatibleWith(GroupingOptions other) {
@@ -84,6 +101,9 @@ public class GroupingOptions {
     if (sortComparatorClass != other.sortComparatorClass) {
       return false;
     }
+    if (!extraConf.equals(other.extraConf)) {
+      return false;
+    }
     return true;
   }
 
@@ -100,7 +120,9 @@ public class GroupingOptions {
     private Class<? extends RawComparator> groupingComparatorClass;
     private Class<? extends RawComparator> sortComparatorClass;
     private int numReducers;
-
+    private Map<String, String> extraConf = Maps.newHashMap();
+    private Set<SourceTarget<?>> sourceTargets = Sets.newHashSet();
+    
     public Builder() {
     }
 
@@ -127,8 +149,19 @@ public class GroupingOptions {
       return this;
     }
 
+    public Builder conf(String confKey, String confValue) {
+      this.extraConf.put(confKey, confValue);
+      return this;
+    }
+    
+    public Builder sourceTarget(SourceTarget<?> st) {
+      this.sourceTargets.add(st);
+      return this;
+    }
+    
     public GroupingOptions build() {
-      return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass, numReducers);
+      return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass,
+          numReducers, extraConf, sourceTargets);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index da7c798..c97fac6 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -182,7 +182,7 @@ public class MemCollection<S> implements PCollection<S> {
 
   @Override
   public long getSize() {
-    return collect.size();
+    return collect.isEmpty() ? 0 : 1; // getSize is only used for pipeline optimization in MR
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index ee27ecc..d105bb4 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -88,7 +88,7 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
 
   @Override
   public long getSize() {
-    return parent.getSize();
+    return 1; // getSize is only used for pipeline optimization in MR
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index 4eb6e9c..ccac5d5 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.impl.mr.collect;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,13 +30,17 @@ import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.util.PartitionUtils;
 import org.apache.hadoop.mapreduce.Job;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
 
@@ -59,8 +64,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   public void configureShuffle(Job job) {
     ptype.configureShuffle(job, groupingOptions);
     if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) {
-      long bytesPerTask = job.getConfiguration().getLong("crunch.bytes.per.reduce.task", (1000L * 1000L * 1000L));
-      int numReduceTasks = 1 + (int) (getSize() / bytesPerTask);
+      int numReduceTasks = PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration());
       if (numReduceTasks > 0) {
         job.setNumReduceTasks(numReduceTasks);
         LOG.info(String.format("Setting num reduce tasks to %d", numReduceTasks));
@@ -109,6 +113,15 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   }
 
   @Override
+  public Set<SourceTarget<?>> getTargetDependencies() {
+    Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies());
+    if (groupingOptions != null) {
+      td.addAll(groupingOptions.getSourceTargets());
+    }
+    return ImmutableSet.copyOf(td);
+  }
+  
+  @Override
   public List<PCollectionImpl<?>> getParents() {
     return ImmutableList.<PCollectionImpl<?>> of(parent);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index 2f8c1e3..c8fe23a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.avro;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.DatumReader;
@@ -29,6 +30,7 @@ import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.avro.AvroType;
@@ -39,7 +41,7 @@ import org.apache.hadoop.fs.Path;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.UnmodifiableIterator;
 
-class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
+public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
 
   private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
 
@@ -47,10 +49,15 @@ class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
   private final MapFn<T, T> mapFn;
 
   public AvroFileReaderFactory(AvroType<T> atype) {
-    this.recordReader = AvroFileReaderFactory.createDatumReader(atype);
+    this.recordReader = createDatumReader(atype);
     this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
   }
 
+  public AvroFileReaderFactory(Schema schema) {
+    this.recordReader = new GenericDatumReader<T>(schema);
+    this.mapFn = IdentityFn.<T>getInstance();
+  }
+  
   static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
     if (avroType.hasReflect()) {
       if (avroType.hasSpecific()) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index 2f32746..3f45644 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -23,21 +23,24 @@ import java.util.Iterator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.UnmodifiableIterator;
 
-class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
+public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
 
   private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
 
@@ -59,6 +62,14 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
     }
   }
 
+  public SeqFileReaderFactory(Class clazz) {
+    PType<T> ptype = Writables.writables(clazz);
+    this.converter = ptype.getConverter();
+    this.mapFn = ptype.getInputMapFn();
+    this.key = NullWritable.get();
+    this.value = (Writable) ReflectionUtils.newInstance(clazz, null);
+  }
+  
   @Override
   public Iterator<T> read(FileSystem fs, final Path path) {
     mapFn.initialize();

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sort.java b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
index cca5a79..23bcaee 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
@@ -17,55 +17,41 @@
  */
 package org.apache.crunch.lib;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import static org.apache.crunch.lib.sort.Comparators.*;
+import static org.apache.crunch.lib.sort.SortFns.*;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.reflect.ReflectData;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.GroupingOptions.Builder;
-import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.lib.sort.TotalOrderPartitioner;
+import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.TupleFactory;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.crunch.util.PartitionUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 
 /**
  * Utilities for sorting {@code PCollection} instances.
  */
 public class Sort {
 
+  /**
+   * For signaling the order in which a sort should be done.
+   */
   public enum Order {
     ASCENDING,
     DESCENDING,
@@ -79,8 +65,8 @@ public class Sort {
    * </code> Column numbering is 1-based.
    */
   public static class ColumnOrder {
-    int column;
-    Order order;
+    private int column;
+    private Order order;
 
     public ColumnOrder(int column, Order order) {
       this.column = column;
@@ -91,6 +77,14 @@ public class Sort {
       return new ColumnOrder(column, order);
     }
 
+    public int column() {
+      return column;
+    }
+    
+    public Order order() {
+      return order;
+    }
+    
     @Override
     public String toString() {
       return "ColumnOrder: column:" + column + ", Order: " + order;
@@ -98,211 +92,79 @@ public class Sort {
   }
 
   /**
-   * Sorts the {@link PCollection} using the natural ordering of its elements.
+   * Sorts the {@code PCollection} using the natural ordering of its elements in ascending order.
    * 
-   * @return a {@link PCollection} representing the sorted collection.
+   * @return a {@code PCollection} representing the sorted collection.
    */
   public static <T> PCollection<T> sort(PCollection<T> collection) {
     return sort(collection, Order.ASCENDING);
   }
 
   /**
-   * Sorts the {@link PCollection} using the natural ordering of its elements in
-   * the order specified.
+   * Sorts the {@code PCollection} using the natural order of its elements with the given {@code Order}.
    * 
-   * @return a {@link PCollection} representing the sorted collection.
+   * @return a {@code PCollection} representing the sorted collection.
    */
   public static <T> PCollection<T> sort(PCollection<T> collection, Order order) {
+    return sort(collection, -1, order);
+  }
+  
+  /**
+   * Sorts the {@code PCollection} using the natural ordering of its elements in
+   * the order specified using the given number of reducers.
+   * 
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order) {
     PTypeFamily tf = collection.getTypeFamily();
     PTableType<T, Void> type = tf.tableOf(collection.getPType(), tf.nulls());
     Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, collection.getPType(), order);
     PTable<T, Void> pt = collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
       @Override
       public void process(T input, Emitter<Pair<T, Void>> emitter) {
         emitter.emit(Pair.of(input, (Void) null));
       }
     }, type);
+    GroupingOptions options = buildGroupingOptions(pt, conf, numReducers, order);
     return pt.groupByKey(options).ungroup().keys();
   }
 
   /**
-   * Sorts the {@link PTable} using the natural ordering of its keys.
+   * Sorts the {@code PTable} using the natural ordering of its keys in ascending order.
    * 
-   * @return a {@link PTable} representing the sorted table.
+   * @return a {@code PTable} representing the sorted table.
    */
   public static <K, V> PTable<K, V> sort(PTable<K, V> table) {
     return sort(table, Order.ASCENDING);
   }
 
   /**
-   * Sorts the {@link PTable} using the natural ordering of its keys in the
-   * order specified.
-   * 
-   * @return a {@link PTable} representing the sorted collection.
+   * Sorts the {@code PTable} using the natural ordering of its keys with the given {@code Order}.
+   *
+   * @return a {@code PTable} representing the sorted table.
    */
   public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) {
-    PTypeFamily tf = table.getTypeFamily();
-    Configuration conf = table.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, table.getKeyType(), key);
-    return table.groupByKey(options).ungroup();
-  }
-
-  static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
-    private final int index;
-    
-    public SingleKeyFn(int index) {
-      this.index = index;
-    }
-
-    @Override
-    public K map(V input) {
-      return (K) input.get(index);
-    }
-  }
-  
-  static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
-    private final int[] indices;
-    private final TupleFactory tupleFactory;
-    
-    public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
-      this.indices = indices;
-      this.tupleFactory = tupleFactory;
-    }
-    
-    @Override
-    public K map(V input) {
-      Object[] values = new Object[indices.length];
-      for (int i = 0; i < indices.length; i++) {
-        values[i] = input.get(indices[i]);
-      }
-      return (K) tupleFactory.makeTuple(values);
-    }
+    return sort(table, -1, key);
   }
   
-  static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
-
-    private final int[] indices;
-    private final String schemaJson;
-    private transient Schema schema;
-    
-    public AvroGenericFn(int[] indices, Schema schema) {
-      this.indices = indices;
-      this.schemaJson = schema.toString();
-    }
-    
-    @Override
-    public void initialize() {
-      this.schema = (new Schema.Parser()).parse(schemaJson);
-    }
-    
-    @Override
-    public GenericRecord map(V input) {
-      GenericRecord rec = new GenericData.Record(schema);
-      for (int i = 0; i < indices.length; i++) {
-        rec.put(i, input.get(indices[i]));
-      }
-      return rec;
-    }
-  }
-  
-  static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
-    // Guarantee each tuple schema has a globally unique name
-    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
-    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-    List<Schema.Field> fields = Lists.newArrayList();
-    AvroType<S> parentAvroType = (AvroType<S>) ptype;
-    Schema parentAvroSchema = parentAvroType.getSchema();
-
-    for (int index = 0; index < orders.length; index++) {
-      ColumnOrder columnOrder = orders[index];
-      AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
-      Schema fieldSchema = atype.getSchema();
-      String fieldName = parentAvroSchema.getFields().get(index).name();
-      // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
-      // Text instances: making this consistent
-      Schema.Field.Order order = columnOrder.order == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
-        Schema.Field.Order.ASCENDING;
-      fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
-    }
-    schema.setFields(fields);
-    return schema;
+  /**
+   * Sorts the {@code PTable} using the natural ordering of its keys in the
+   * order specified with a client-specified number of reducers.
+   * 
+   * @return a {@code PTable} representing the sorted collection.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key) {
+    Configuration conf = table.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(table, conf, numReducers, key);
+    return table.groupByKey(options).ungroup();
   }
 
-  static class KeyExtraction<V extends Tuple> {
-
-    private PType<V> ptype;
-    private final ColumnOrder[] columnOrder;
-    private final int[] cols;
-    
-    private MapFn<V, Object> byFn;
-    private PType<Object> keyPType;
-    
-    public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
-      this.ptype = ptype;
-      this.columnOrder = columnOrder;
-      this.cols = new int[columnOrder.length];
-      for (int i = 0; i < columnOrder.length; i++) {
-        cols[i] = columnOrder[i].column - 1;
-      }
-      init();
-    }
-    
-    private void init() {
-      List<PType> pt = ptype.getSubTypes();
-      PTypeFamily ptf = ptype.getFamily();
-      if (cols.length == 1) {
-        byFn = new SingleKeyFn(cols[0]);
-        keyPType = pt.get(cols[0]);
-      } else {
-        TupleFactory tf = null;
-        switch (cols.length) {
-        case 2:
-          tf = TupleFactory.PAIR;
-          keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
-          break;
-        case 3:
-          tf = TupleFactory.TUPLE3;
-          keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
-          break;
-        case 4:
-          tf = TupleFactory.TUPLE4;
-          keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
-          break;
-        default:
-          PType[] pts = new PType[cols.length];
-          for (int i = 0; i < pts.length; i++) {
-            pts[i] = pt.get(cols[i]);
-          }
-          tf = TupleFactory.TUPLEN;
-          keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
-        }
-        
-        if (ptf == AvroTypeFamily.getInstance()) {
-          Schema s = createOrderedTupleSchema(keyPType, columnOrder);
-          keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
-          byFn = new AvroGenericFn(cols, s);
-        } else {
-          byFn = new TupleKeyFn(cols, tf);
-        }
-      }
-      
-    }
-
-    public MapFn<V, Object> getByFn() {
-      return byFn;
-    }
-    
-    public PType<Object> getKeyType() {
-      return keyPType;
-    }
-  }
   
   /**
-   * Sorts the {@link PCollection} of {@link Pair}s using the specified column
+   * Sorts the {@code PCollection} of {@code Pair}s using the specified column
    * ordering.
    * 
-   * @return a {@link PCollection} representing the sorted collection.
+   * @return a {@code PCollection} representing the sorted collection.
    */
   public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection,
       ColumnOrder... columnOrders) {
@@ -310,10 +172,10 @@ public class Sort {
   }
 
   /**
-   * Sorts the {@link PCollection} of {@link Tuple3}s using the specified column
+   * Sorts the {@code PCollection} of {@code Tuple3}s using the specified column
    * ordering.
    * 
-   * @return a {@link PCollection} representing the sorted collection.
+   * @return a {@code PCollection} representing the sorted collection.
    */
   public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection,
       ColumnOrder... columnOrders) {
@@ -321,10 +183,10 @@ public class Sort {
   }
 
   /**
-   * Sorts the {@link PCollection} of {@link Tuple4}s using the specified column
+   * Sorts the {@code PCollection} of {@code Tuple4}s using the specified column
    * ordering.
    * 
-   * @return a {@link PCollection} representing the sorted collection.
+   * @return a {@code PCollection} representing the sorted collection.
    */
   public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
       PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
@@ -332,45 +194,59 @@ public class Sort {
   }
 
   /**
-   * Sorts the {@link PCollection} of {@link TupleN}s using the specified column
-   * ordering.
+   * Sorts the {@code PCollection} of tuples using the specified column ordering.
+   *
+   * @return a {@code PCollection} representing the sorted collection.
+   */
+  public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection,
+      ColumnOrder... columnOrders) {
+    return sortTuples(collection, -1, columnOrders);
+  }
+  
+  /**
+   * Sorts the {@code PCollection} of {@link TupleN}s using the specified column
+   * ordering and a client-specified number of reducers.
    * 
-   * @return a {@link PCollection} representing the sorted collection.
+   * @return a {@code PCollection} representing the sorted collection.
    */
-  public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) {
-    PTypeFamily tf = collection.getTypeFamily();
+  public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, int numReducers,
+      ColumnOrder... columnOrders) {
     PType<T> pType = collection.getPType();
     KeyExtraction<T> ke = new KeyExtraction<T>(pType, columnOrders);
     PTable<Object, T> pt = collection.by(ke.getByFn(), ke.getKeyType());
     Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, ke.getKeyType(), pType, columnOrders);
+    GroupingOptions options = buildGroupingOptions(pt, conf, numReducers, columnOrders);
     return pt.groupByKey(options).ungroup().values();
   }
 
   // TODO: move to type family?
-  private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> ptype,
-      Order order) {
+  private static <K, V> GroupingOptions buildGroupingOptions(PTable<K, V> ptable, Configuration conf,
+      int numReducers, Order order) {
+    PType<K> ptype = ptable.getKeyType();
+    PTypeFamily tf = ptable.getTypeFamily();
     Builder builder = GroupingOptions.builder();
     if (order == Order.DESCENDING) {
       if (tf == WritableTypeFamily.getInstance()) {
         builder.sortComparatorClass(ReverseWritableComparator.class);
       } else if (tf == AvroTypeFamily.getInstance()) {
-        AvroType<T> avroType = (AvroType<T>) ptype;
+        AvroType<K> avroType = (AvroType<K>) ptype;
         Schema schema = avroType.getSchema();
-        conf.set("crunch.schema", schema.toString());
+        builder.conf("crunch.schema", schema.toString());
         builder.sortComparatorClass(ReverseAvroComparator.class);
       } else {
         throw new RuntimeException("Unrecognized type family: " + tf);
       }
+    } else if (tf == AvroTypeFamily.getInstance()) {
+      builder.conf("crunch.schema", ((AvroType<K>) ptype).getSchema().toString());
     }
-    // TODO:CRUNCH-23: Intermediate Fix for release 1. More elaborate fix is
-    // required check JIRA for details.
-    builder.numReducers(1);
+    configureReducers(builder, ptable, conf, numReducers);
     return builder.build();
   }
 
-  private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> keyType,
-      PType<?> valueType, ColumnOrder[] columnOrders) {
+  private static <K, V> GroupingOptions buildGroupingOptions(PTable<K, V> ptable, Configuration conf,
+      int numReducers, ColumnOrder[] columnOrders) {
+    PTypeFamily tf = ptable.getTypeFamily();
+    PType<K> keyType = ptable.getKeyType();
     Builder builder = GroupingOptions.builder();
     if (tf == WritableTypeFamily.getInstance()) {
       if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
@@ -380,163 +256,39 @@ public class Sort {
         builder.sortComparatorClass(TupleWritableComparator.class);
       }
     } else if (tf == AvroTypeFamily.getInstance()) {
+      AvroType<K> avroType = (AvroType<K>) keyType;
+      Schema schema = avroType.getSchema();
+      builder.conf("crunch.schema", schema.toString());
       if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
-        AvroType<T> avroType = (AvroType<T>) keyType;
-        Schema schema = avroType.getSchema();
-        conf.set("crunch.schema", schema.toString());
         builder.sortComparatorClass(ReverseAvroComparator.class);
       }
     } else {
       throw new RuntimeException("Unrecognized type family: " + tf);
     }
-    // TODO:CRUNCH-23: Intermediate Fix for release 1. More elaborate fix is
-    // required check JIRA for details.
-    builder.numReducers(1);
+    configureReducers(builder, ptable, conf, numReducers);
     return builder.build();
   }
 
-  static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
-
-    RawComparator<T> comparator;
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        JobConf jobConf = new JobConf(conf);
-        comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+  private static <K, V> void configureReducers(GroupingOptions.Builder builder,
+      PTable<K, V> ptable, Configuration conf, int numReducers) {
+    if (numReducers <= 0) {
+      numReducers = PartitionUtils.getRecommendedPartitions(ptable, conf);
+      if (numReducers < 5) {
+        // Not worth the overhead, force it to 1
+        numReducers = 1;
       }
     }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
-      return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
-    }
-
-    @Override
-    public int compare(T o1, T o2) {
-      return -comparator.compare(o1, o2);
-    }
-
-  }
-
-  static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
-
-    private Schema schema;
-
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+    builder.numReducers(numReducers);
+    if (numReducers > 1) {
+      Iterable<K> iter = Sample.reservoirSample(ptable.keys(), numReducers - 1).materialize();
+      MaterializableIterable<K> mi = (MaterializableIterable<K>) iter;
+      if (mi.isSourceTarget()) {
+        builder.sourceTarget((SourceTarget) mi.getSource());
       }
-    }
-
-    @Override
-    public int compare(T o1, T o2) {
-      return -ReflectData.get().compare(o1, o2, schema);
-    }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
-      return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
-    }
-
-  }
-
-  static class TupleWritableComparator extends WritableComparator implements Configurable {
-
-    private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
-
-    Configuration conf;
-    ColumnOrder[] columnOrders;
-
-    public TupleWritableComparator() {
-      super(TupleWritable.class, true);
-    }
-
-    public static void configureOrdering(Configuration conf, Order... orders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY,
-          Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
-            @Override
-            public String apply(Order o) {
-              return o.name();
-            }
-          })));
-    }
-
-    public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY,
-          Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
-            @Override
-            public String apply(ColumnOrder o) {
-              return o.column + ";" + o.order.name();
-            }
-          })));
-    }
-
-    @Override
-    public int compare(WritableComparable a, WritableComparable b) {
-      TupleWritable ta = (TupleWritable) a;
-      TupleWritable tb = (TupleWritable) b;
-      for (int index = 0; index < columnOrders.length; index++) {
-        int order = 1;
-        if (columnOrders[index].order == Order.ASCENDING) {
-          order = 1;
-        } else if (columnOrders[index].order == Order.DESCENDING) {
-          order = -1;
-        } else { // ignore
-          continue;
-        }
-        if (!ta.has(index) && !tb.has(index)) {
-          continue;
-        } else if (ta.has(index) && !tb.has(index)) {
-          return order;
-        } else if (!ta.has(index) && tb.has(index)) {
-          return -order;
-        } else {
-          Writable v1 = ta.get(index);
-          Writable v2 = tb.get(index);
-          if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-            if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
-              int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
-              if (cmp != 0) {
-                return order * cmp;
-              }
-            } else {
-              int cmp = v1.hashCode() - v2.hashCode();
-              if (cmp != 0) {
-                return order * cmp;
-              }
-            }
-          }
-        }
-      }
-      return 0; // ordering using specified cols found no differences
-    }
-
-    @Override
-    public Configuration getConf() {
-      return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-      if (conf != null) {
-        String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
-        String[] columnOrderNames = ordering.split(",");
-        columnOrders = new ColumnOrder[columnOrderNames.length];
-        for (int i = 0; i < columnOrders.length; i++) {
-          String[] split = columnOrderNames[i].split(";");
-          int column = Integer.parseInt(split[0]);
-          Order order = Order.valueOf(split[1]);
-          columnOrders[i] = ColumnOrder.by(column, order);
-
-        }
-      }
-    }
+      builder.partitionerClass(TotalOrderPartitioner.class);
+      builder.conf(TotalOrderPartitioner.PARTITIONER_PATH, mi.getPath().toString());
+      //TODO: distcache handling
+    }   
   }
 
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
index fa28155..56476c1 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -30,9 +30,8 @@ import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -72,7 +71,8 @@ public class MapsideJoin {
 
     if (iterable instanceof MaterializableIterable) {
       MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
-      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(), right.getPType());
+      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
+          right.getPType());
       ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
       if (mi.isSourceTarget()) {
         optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
@@ -120,32 +120,24 @@ public class MapsideJoin {
     }
 
     private Path getCacheFilePath() {
-      Path input = new Path(inputPath);
-      try {
-        for (Path localPath : DistributedCache.getLocalCacheFiles(getConfiguration())) {
-          if (localPath.toString().endsWith(input.getName())) {
-            return localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
-
-          }
-        }
-      } catch (IOException e) {
-        throw new CrunchRuntimeException(e);
+      Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
+      if (local == null) {
+        throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
       }
-
-      throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
+      return local;
     }
 
     @Override
     public void configure(Configuration conf) {
-      DistributedCache.addCacheFile(new Path(inputPath).toUri(), conf);
+      DistCache.addCacheFile(new Path(inputPath), conf);
     }
     
     @Override
     public void initialize() {
       super.initialize();
 
-      ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K, V>>) ptype
-          .getDefaultFileSource(getCacheFilePath());
+      ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
+          getCacheFilePath());
       Iterable<Pair<K, V>> iterable = null;
       try {
         iterable = sourceTarget.read(getConfiguration());
@@ -168,7 +160,5 @@ public class MapsideJoin {
         emitter.emit(Pair.of(key, valuePair));
       }
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java b/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
new file mode 100644
index 0000000..ae7f49a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.Arrays;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+/**
+ * A collection of {@code RawComparator<T>} implementations that are used by Crunch's {@code Sort} library.
+ */
+public class Comparators {
+  
+  public static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
+
+    private RawComparator<T> comparator;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        JobConf jobConf = new JobConf(conf);
+        comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+      }
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+      return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return -comparator.compare(o1, o2);
+    }
+  }
+
+  public static class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
+
+    private Schema schema;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+      }
+    }
+
+    @Override
+    public int compare(AvroKey<T> o1, AvroKey<T> o2) {
+      return -ReflectData.get().compare(o1.datum(), o2.datum(), schema);
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+      return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+    }
+  }
+
+  public static class TupleWritableComparator extends WritableComparator implements Configurable {
+
+    private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
+
+    private Configuration conf;
+    private ColumnOrder[] columnOrders;
+
+    public TupleWritableComparator() {
+      super(TupleWritable.class, true);
+    }
+
+    public static void configureOrdering(Configuration conf, Order... orders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY,
+          Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
+            @Override
+            public String apply(Order o) {
+              return o.name();
+            }
+          })));
+    }
+
+    public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY,
+          Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
+            @Override
+            public String apply(ColumnOrder o) {
+              return o.column() + ";" + o.order().name();
+            }
+          })));
+    }
+
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      TupleWritable ta = (TupleWritable) a;
+      TupleWritable tb = (TupleWritable) b;
+      for (int index = 0; index < columnOrders.length; index++) {
+        int order = 1;
+        if (columnOrders[index].order() == Order.ASCENDING) {
+          order = 1;
+        } else if (columnOrders[index].order() == Order.DESCENDING) {
+          order = -1;
+        } else { // ignore
+          continue;
+        }
+        if (!ta.has(index) && !tb.has(index)) {
+          continue;
+        } else if (ta.has(index) && !tb.has(index)) {
+          return order;
+        } else if (!ta.has(index) && tb.has(index)) {
+          return -order;
+        } else {
+          Writable v1 = ta.get(index);
+          Writable v2 = tb.get(index);
+          if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+            if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+              int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            } else {
+              int cmp = v1.hashCode() - v2.hashCode();
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            }
+          }
+        }
+      }
+      return 0; // ordering using specified cols found no differences
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      if (conf != null) {
+        String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
+        String[] columnOrderNames = ordering.split(",");
+        columnOrders = new ColumnOrder[columnOrderNames.length];
+        for (int i = 0; i < columnOrders.length; i++) {
+          String[] split = columnOrderNames[i].split(";");
+          int column = Integer.parseInt(split[0]);
+          Order order = Order.valueOf(split[1]);
+          columnOrders[i] = ColumnOrder.by(column, order);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java b/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
new file mode 100644
index 0000000..be218f6
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A set of {@code DoFn}s that are used by Crunch's {@code Sort} library.
+ */
+public class SortFns {
+
+  /**
+   * Extracts a single indexed key from a {@code Tuple} instance.
+   */
+  public static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
+    private final int index;
+    
+    public SingleKeyFn(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public K map(V input) {
+      return (K) input.get(index);
+    }
+  }
+
+  /**
+   * Extracts a composite key from a {@code Tuple} instance.
+   */
+  public static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
+    private final int[] indices;
+    private final TupleFactory tupleFactory;
+    
+    public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
+      this.indices = indices;
+      this.tupleFactory = tupleFactory;
+    }
+    
+    @Override
+    public K map(V input) {
+      Object[] values = new Object[indices.length];
+      for (int i = 0; i < indices.length; i++) {
+        values[i] = input.get(indices[i]);
+      }
+      return (K) tupleFactory.makeTuple(values);
+    }
+  }
+  
+  /**
+   * Pulls a composite set of keys from an Avro {@code GenericRecord} instance.
+   */
+  public static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
+
+    private final int[] indices;
+    private final String schemaJson;
+    private transient Schema schema;
+    
+    public AvroGenericFn(int[] indices, Schema schema) {
+      this.indices = indices;
+      this.schemaJson = schema.toString();
+    }
+    
+    @Override
+    public void initialize() {
+      this.schema = (new Schema.Parser()).parse(schemaJson);
+    }
+    
+    @Override
+    public GenericRecord map(V input) {
+      GenericRecord rec = new GenericData.Record(schema);
+      for (int i = 0; i < indices.length; i++) {
+        rec.put(i, input.get(indices[i]));
+      }
+      return rec;
+    }
+  }
+  
+  /**
+   * Constructs an Avro schema for the given {@code PType<S>} that respects the given column
+   * orderings.
+   */
+  public static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+    // Guarantee each tuple schema has a globally unique name
+    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    AvroType<S> parentAvroType = (AvroType<S>) ptype;
+    Schema parentAvroSchema = parentAvroType.getSchema();
+
+    for (int index = 0; index < orders.length; index++) {
+      ColumnOrder columnOrder = orders[index];
+      AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+      Schema fieldSchema = atype.getSchema();
+      String fieldName = parentAvroSchema.getFields().get(index).name();
+      // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
+      // Text instances: making this consistent
+      Schema.Field.Order order = columnOrder.order() == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
+        Schema.Field.Order.ASCENDING;
+      fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
+    }
+    schema.setFields(fields);
+    return schema;
+  }
+
+  /**
+   * Utility class for encapsulating key extraction logic and serialization information about
+   * key extraction.
+   */
+  public static class KeyExtraction<V extends Tuple> {
+
+    private PType<V> ptype;
+    private final ColumnOrder[] columnOrder;
+    private final int[] cols;
+    
+    private MapFn<V, Object> byFn;
+    private PType<Object> keyPType;
+    
+    public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
+      this.ptype = ptype;
+      this.columnOrder = columnOrder;
+      this.cols = new int[columnOrder.length];
+      for (int i = 0; i < columnOrder.length; i++) {
+        cols[i] = columnOrder[i].column() - 1;
+      }
+      init();
+    }
+    
+    private void init() {
+      List<PType> pt = ptype.getSubTypes();
+      PTypeFamily ptf = ptype.getFamily();
+      if (cols.length == 1) {
+        byFn = new SingleKeyFn(cols[0]);
+        keyPType = pt.get(cols[0]);
+      } else {
+        TupleFactory tf = null;
+        switch (cols.length) {
+        case 2:
+          tf = TupleFactory.PAIR;
+          keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
+          break;
+        case 3:
+          tf = TupleFactory.TUPLE3;
+          keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
+          break;
+        case 4:
+          tf = TupleFactory.TUPLE4;
+          keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
+          break;
+        default:
+          PType[] pts = new PType[cols.length];
+          for (int i = 0; i < pts.length; i++) {
+            pts[i] = pt.get(cols[i]);
+          }
+          tf = TupleFactory.TUPLEN;
+          keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
+        }
+        
+        if (ptf == AvroTypeFamily.getInstance()) {
+          Schema s = createOrderedTupleSchema(keyPType, columnOrder);
+          keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
+          byFn = new AvroGenericFn(cols, s);
+        } else {
+          byFn = new TupleKeyFn(cols, tf);
+        }
+      }
+      
+    }
+
+    public MapFn<V, Object> getByFn() {
+      return byFn;
+    }
+    
+    public PType<Object> getKeyType() {
+      return keyPType;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java b/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
new file mode 100644
index 0000000..94fbdbe
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.avro.AvroFileReaderFactory;
+import org.apache.crunch.io.seq.SeqFileReaderFactory;
+import org.apache.crunch.types.writable.WritableDeepCopier;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * A partition-aware {@code Partitioner} instance that can work with either Avro or Writable-formatted
+ * keys.
+ */
+public class TotalOrderPartitioner<K, V> extends Partitioner<K, V> implements Configurable {
+
+  public static final String DEFAULT_PATH = "_partition.lst";
+  public static final String PARTITIONER_PATH = 
+    "crunch.totalorderpartitioner.path";
+  
+  private Configuration conf;
+  private Node<K> partitions;
+  
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    try {
+      this.conf = conf;
+      String parts = getPartitionFile(conf);
+      final Path partFile = new Path(parts);
+      final FileSystem fs = (DEFAULT_PATH.equals(parts))
+        ? FileSystem.getLocal(conf)     // assume in DistributedCache
+        : partFile.getFileSystem(conf);
+
+      Job job = new Job(conf);
+      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+      RawComparator<K> comparator =
+          (RawComparator<K>) job.getSortComparator();
+      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf, comparator);
+      int numReduceTasks = job.getNumReduceTasks();
+      if (splitPoints.length != numReduceTasks - 1) {
+        throw new IOException("Wrong number of partitions in keyset");
+      }
+      partitions = new BinarySearchNode(splitPoints, comparator);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Can't read partitions file", e);
+    }
+  }
+
+  @Override
+  public int getPartition(K key, V value, int modulo) {
+    return partitions.findPartition(key);
+  }
+
+  public static void setPartitionFile(Configuration conf, Path p) {
+    conf.set(PARTITIONER_PATH, p.toString());
+  }
+
+  public static String getPartitionFile(Configuration conf) {
+    return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+  }
+  
+  @SuppressWarnings("unchecked") // map output key class
+  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+      Configuration conf, final RawComparator<K> comparator) throws IOException {
+    ArrayList<K> parts = new ArrayList<K>();
+    String schema = conf.get("crunch.schema");
+    if (schema != null) {
+      Schema s = (new Schema.Parser()).parse(schema);
+      AvroFileReaderFactory<K> a = new AvroFileReaderFactory<K>(s);
+      Iterator<K> iter = CompositePathIterable.create(fs, p, a).iterator();
+      while (iter.hasNext()) {
+        parts.add((K) new AvroKey<K>(iter.next()));
+      }
+    } else {
+      WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
+      SeqFileReaderFactory<K> s = new SeqFileReaderFactory<K>(keyClass);
+      Iterator<K> iter = CompositePathIterable.create(fs, p, s).iterator();
+      while (iter.hasNext()) {
+        parts.add((K) wdc.deepCopy((Writable) iter.next()));
+      }
+    }
+    Collections.sort(parts, comparator);
+    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+  }
+  
+  /**
+   * Interface to the partitioner to locate a key in the partition keyset.
+   */
+  interface Node<T> {
+    /**
+     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+     */
+    int findPartition(T key);
+  }
+  
+  class BinarySearchNode implements Node<K> {
+    private final K[] splitPoints;
+    private final RawComparator<K> comparator;
+    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+      this.splitPoints = splitPoints;
+      this.comparator = comparator;
+    }
+    public int findPartition(K key) {
+      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
index e9c773c..d276cd6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -24,7 +24,7 @@ import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -135,7 +135,7 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
   public abstract void configureShuffle(Job job, GroupingOptions options);
 
   @Override
-  public SourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
+  public ReadableSourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
     throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java
index 565615a..ebddf84 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PType.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
-import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -77,7 +77,7 @@ public interface PType<T> extends Serializable {
    * Returns a {@code SourceTarget} that is able to read/write data using the serialization format
    * specified by this {@code PType}.
    */
-  SourceTarget<T> getDefaultFileSource(Path path);
+  ReadableSourceTarget<T> getDefaultFileSource(Path path);
 
   /**
    * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index a0e2722..a92b0d0 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -24,8 +24,8 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.avro.AvroFileSourceTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.DeepCopier;
@@ -161,7 +161,7 @@ public class AvroType<T> implements PType<T> {
   }
 
   @Override
-  public SourceTarget<T> getDefaultFileSource(Path path) {
+  public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
     return new AvroFileSourceTarget<T>(path, this);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
index 2f75b94..93e0fd6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -22,8 +22,8 @@ import java.util.List;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
@@ -98,7 +98,7 @@ class WritableTableType<K, V> implements PTableType<K, V> {
   }
 
   @Override
-  public SourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
+  public ReadableSourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
     return new SeqFileTableSourceTarget<K, V>(path, this);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
index fd64b3a..734946c 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
-import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.seq.SeqFileSourceTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.DeepCopier;
@@ -90,7 +90,7 @@ public class WritableType<T, W extends Writable> implements PType<T> {
   }
 
   @Override
-  public SourceTarget<T> getDefaultFileSource(Path path) {
+  public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
     return new SeqFileSourceTarget<T>(path, this);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/DistCache.java b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
index 891cc6c..3e49930 100644
--- a/crunch/src/main/java/org/apache/crunch/util/DistCache.java
+++ b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
@@ -80,6 +80,23 @@ public class DistCache {
     return value;
   }
 
+  public static void addCacheFile(Path path, Configuration conf) {
+    DistributedCache.addCacheFile(path.toUri(), conf);
+  }
+  
+  public static Path getPathToCacheFile(Path path, Configuration conf) {
+    try {
+      for (Path localPath : DistributedCache.getLocalCacheFiles(conf)) {
+        if (localPath.toString().endsWith(path.getName())) {
+          return localPath.makeQualified(FileSystem.getLocal(conf));
+        }
+      }
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+    return null;
+  }
+  
   /**
    * Adds the specified jar to the distributed cache of jobs using the provided
    * configuration. The jar will be placed on the classpath of tasks run by the
@@ -143,11 +160,11 @@ public class DistCache {
    * @throws IOException
    *           If there is a problem searching for the jar file.
    */
-  public static String findContainingJar(Class jarClass) throws IOException {
+  public static String findContainingJar(Class<?> jarClass) throws IOException {
     ClassLoader loader = jarClass.getClassLoader();
     String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
-    for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
-      URL url = (URL) itr.nextElement();
+    for (Enumeration<URL> itr = loader.getResources(classFile); itr.hasMoreElements();) {
+      URL url = itr.nextElement();
       if ("jar".equals(url.getProtocol())) {
         String toReturn = url.getPath();
         if (toReturn.startsWith("file:")) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java
new file mode 100644
index 0000000..da8db6b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import org.apache.crunch.PCollection;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public class PartitionUtils {
+  public static final String BYTES_PER_REDUCE_TASK = "crunch.bytes.per.reduce.task";
+  public static final long DEFAULT_BYTES_PER_REDUCE_TASK = 1000L * 1000L * 1000L;
+  
+  public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) {
+    long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK);
+    return 1 + (int) (pcollection.getSize() / bytesPerTask);
+  }
+}