You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/02/24 22:39:13 UTC

git commit: CRUNCH-167: Re-write the sorting strategy for tuples to only select the fields that we are using to sort on as the keys, and re-implement the wrapper functions to compress all of the different tuple sort methods (Pairs, Trips, etc.) into a si

Updated Branches:
  refs/heads/master 6cc8d8ec8 -> 855b0ddcb


CRUNCH-167: Re-write the sorting strategy for tuples to only select the fields that
we are using to sort on as the keys, and re-implement the wrapper functions to
compress all of the different tuple sort methods (Pairs, Trips, etc.) into a single
method.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/855b0ddc
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/855b0ddc
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/855b0ddc

Branch: refs/heads/master
Commit: 855b0ddcb738611abfd9bed445403f18b63d2453
Parents: 6cc8d8e
Author: Josh Wills <jw...@apache.org>
Authored: Thu Feb 21 16:35:49 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Feb 24 13:02:55 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/SortByValueIT.java  |   81 ++++
 .../src/it/java/org/apache/crunch/lib/SortIT.java  |   16 +-
 crunch/src/it/resources/sort_by_value.txt          |    5 +
 .../src/main/java/org/apache/crunch/lib/Sort.java  |  353 ++++++++-------
 4 files changed, 278 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
new file mode 100644
index 0000000..c313351
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ */
+public class SortByValueIT {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  private static class SplitFn extends MapFn<String, Pair<String, Long>> {
+    private String sep;
+    
+    public SplitFn(String sep) {
+      this.sep = sep;
+    }
+    
+    @Override
+    public Pair<String, Long> map(String input) {
+      String[] pieces = input.split(sep);
+      return Pair.of(pieces[0], Long.valueOf(pieces[1]));
+    }
+  }
+  
+  @Test
+  public void testSortByValueWritables() throws Exception {
+    run(new MRPipeline(SortByValueIT.class), WritableTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testSortByValueAvro() throws Exception {
+    run(new MRPipeline(SortByValueIT.class), AvroTypeFamily.getInstance());
+  }
+  
+  public void run(Pipeline pipeline, PTypeFamily ptf) throws Exception {
+    String sbv = tmpDir.copyResourceFileName("sort_by_value.txt");
+    PTable<String, Long> letterCounts = pipeline.read(From.textFile(sbv)).parallelDo(new SplitFn("\t"),
+        ptf.tableOf(ptf.strings(), ptf.longs()));
+    PCollection<Pair<String, Long>> sorted = Sort.sortPairs(letterCounts, new ColumnOrder(2, Order.DESCENDING));
+    assertEquals(
+        ImmutableList.of(Pair.of("C", 3L), Pair.of("A", 2L), Pair.of("D", 2L), Pair.of("B", 1L), Pair.of("E", 1L)),
+        ImmutableList.copyOf(sorted.materialize()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
index 3ea31ca..bad4864 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -49,7 +49,6 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -78,7 +77,7 @@ public class SortIT implements Serializable {
   }
 
   @Test
-  public void testWritableSortSecondDescFirstDesc() throws Exception {
+  public void testWritableSortSecondDescFirstAsc() throws Exception {
     runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
         "this doc has this text");
   }
@@ -117,14 +116,13 @@ public class SortIT implements Serializable {
   }
 
   @Test
-  public void testAvroSortPairAscAsc() throws Exception {
+  public void testAvroSortPairAscDesc() throws Exception {
     runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
         "this doc has this text");
   }
 
   @Test
-  @Ignore("Avro sorting only works in field order at the moment")
-  public void testAvroSortPairSecondAscFirstDesc() throws Exception {
+  public void testAvroSortPairSecondDescFirstAsc() throws Exception {
     runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
         "this doc has this text");
   }
@@ -220,15 +218,15 @@ public class SortIT implements Serializable {
     String inputPath = tmpDir.copyResourceFileName("docs.txt");
 
     PCollection<String> input = pipeline.readTextFile(inputPath);
-    PCollection<Pair<String, String>> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+    PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
       @Override
       public void process(String input, Emitter<Pair<String, String>> emitter) {
         String[] split = input.split("[\t]+");
         emitter.emit(Pair.of(split[0], split[1]));
       }
-    }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
     PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
-    Iterable<Pair<String, String>> lines = sorted.materialize();
+    List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize());
     Pair<String, String> l = lines.iterator().next();
     assertEquals(firstField, l.first());
     assertEquals(secondField, l.second());
@@ -250,7 +248,7 @@ public class SortIT implements Serializable {
           }
         }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
     PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
-    Iterable<Tuple3<String, String, String>> lines = sorted.materialize();
+    List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize());
     Tuple3<String, String, String> l = lines.iterator().next();
     assertEquals(firstField, l.first());
     assertEquals(secondField, l.second());

http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/resources/sort_by_value.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/sort_by_value.txt b/crunch/src/it/resources/sort_by_value.txt
new file mode 100644
index 0000000..73f7d11
--- /dev/null
+++ b/crunch/src/it/resources/sort_by_value.txt
@@ -0,0 +1,5 @@
+A	2
+B	1
+C	3
+D	2
+E	1

http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sort.java b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
index f2729a2..cca5a79 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
@@ -18,29 +18,33 @@
 package org.apache.crunch.lib;
 
 import java.util.Arrays;
-import java.util.BitSet;
 import java.util.List;
 import java.util.UUID;
 
 import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.GroupingOptions.Builder;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleFactory;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.hadoop.conf.Configurable;
@@ -54,7 +58,6 @@ import org.apache.hadoop.mapred.JobConf;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
@@ -120,13 +123,7 @@ public class Sort {
         emitter.emit(Pair.of(input, (Void) null));
       }
     }, type);
-    PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() {
-      @Override
-      public void process(Pair<T, Void> input, Emitter<T> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
+    return pt.groupByKey(options).ungroup().keys();
   }
 
   /**
@@ -151,6 +148,156 @@ public class Sort {
     return table.groupByKey(options).ungroup();
   }
 
+  static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
+    private final int index;
+    
+    public SingleKeyFn(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public K map(V input) {
+      return (K) input.get(index);
+    }
+  }
+  
+  static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
+    private final int[] indices;
+    private final TupleFactory tupleFactory;
+    
+    public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
+      this.indices = indices;
+      this.tupleFactory = tupleFactory;
+    }
+    
+    @Override
+    public K map(V input) {
+      Object[] values = new Object[indices.length];
+      for (int i = 0; i < indices.length; i++) {
+        values[i] = input.get(indices[i]);
+      }
+      return (K) tupleFactory.makeTuple(values);
+    }
+  }
+  
+  static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
+
+    private final int[] indices;
+    private final String schemaJson;
+    private transient Schema schema;
+    
+    public AvroGenericFn(int[] indices, Schema schema) {
+      this.indices = indices;
+      this.schemaJson = schema.toString();
+    }
+    
+    @Override
+    public void initialize() {
+      this.schema = (new Schema.Parser()).parse(schemaJson);
+    }
+    
+    @Override
+    public GenericRecord map(V input) {
+      GenericRecord rec = new GenericData.Record(schema);
+      for (int i = 0; i < indices.length; i++) {
+        rec.put(i, input.get(indices[i]));
+      }
+      return rec;
+    }
+  }
+  
+  static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+    // Guarantee each tuple schema has a globally unique name
+    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    AvroType<S> parentAvroType = (AvroType<S>) ptype;
+    Schema parentAvroSchema = parentAvroType.getSchema();
+
+    for (int index = 0; index < orders.length; index++) {
+      ColumnOrder columnOrder = orders[index];
+      AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+      Schema fieldSchema = atype.getSchema();
+      String fieldName = parentAvroSchema.getFields().get(index).name();
+      // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
+      // Text instances: making this consistent
+      Schema.Field.Order order = columnOrder.order == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
+        Schema.Field.Order.ASCENDING;
+      fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
+    }
+    schema.setFields(fields);
+    return schema;
+  }
+
+  static class KeyExtraction<V extends Tuple> {
+
+    private PType<V> ptype;
+    private final ColumnOrder[] columnOrder;
+    private final int[] cols;
+    
+    private MapFn<V, Object> byFn;
+    private PType<Object> keyPType;
+    
+    public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
+      this.ptype = ptype;
+      this.columnOrder = columnOrder;
+      this.cols = new int[columnOrder.length];
+      for (int i = 0; i < columnOrder.length; i++) {
+        cols[i] = columnOrder[i].column - 1;
+      }
+      init();
+    }
+    
+    private void init() {
+      List<PType> pt = ptype.getSubTypes();
+      PTypeFamily ptf = ptype.getFamily();
+      if (cols.length == 1) {
+        byFn = new SingleKeyFn(cols[0]);
+        keyPType = pt.get(cols[0]);
+      } else {
+        TupleFactory tf = null;
+        switch (cols.length) {
+        case 2:
+          tf = TupleFactory.PAIR;
+          keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
+          break;
+        case 3:
+          tf = TupleFactory.TUPLE3;
+          keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
+          break;
+        case 4:
+          tf = TupleFactory.TUPLE4;
+          keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
+          break;
+        default:
+          PType[] pts = new PType[cols.length];
+          for (int i = 0; i < pts.length; i++) {
+            pts[i] = pt.get(cols[i]);
+          }
+          tf = TupleFactory.TUPLEN;
+          keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
+        }
+        
+        if (ptf == AvroTypeFamily.getInstance()) {
+          Schema s = createOrderedTupleSchema(keyPType, columnOrder);
+          keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
+          byFn = new AvroGenericFn(cols, s);
+        } else {
+          byFn = new TupleKeyFn(cols, tf);
+        }
+      }
+      
+    }
+
+    public MapFn<V, Object> getByFn() {
+      return byFn;
+    }
+    
+    public PType<Object> getKeyType() {
+      return keyPType;
+    }
+  }
+  
   /**
    * Sorts the {@link PCollection} of {@link Pair}s using the specified column
    * ordering.
@@ -159,28 +306,7 @@ public class Sort {
    */
   public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection,
       ColumnOrder... columnOrders) {
-    // put U and V into a pair/tuple in the key so we can do grouping and
-    // sorting
-    PTypeFamily tf = collection.getTypeFamily();
-    PType<Pair<U, V>> pType = collection.getPType();
-    @SuppressWarnings("unchecked")
-    PTableType<Pair<U, V>, Void> type = tf.tableOf(tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)),
-        tf.nulls());
-    PTable<Pair<U, V>, Void> pt = collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
-      @Override
-      public void process(Pair<U, V> input, Emitter<Pair<Pair<U, V>, Void>> emitter) {
-        emitter.emit(Pair.of(input, (Void) null));
-      }
-    }, type);
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>, Void>, Pair<U, V>>() {
-      @Override
-      public void process(Pair<Pair<U, V>, Void> input, Emitter<Pair<U, V>> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
+    return sortTuples(collection, columnOrders);
   }
 
   /**
@@ -191,27 +317,7 @@ public class Sort {
    */
   public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection,
       ColumnOrder... columnOrders) {
-    PTypeFamily tf = collection.getTypeFamily();
-    PType<Tuple3<V1, V2, V3>> pType = collection.getPType();
-    @SuppressWarnings("unchecked")
-    PTableType<Tuple3<V1, V2, V3>, Void> type = tf.tableOf(
-        tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)), tf.nulls());
-    PTable<Tuple3<V1, V2, V3>, Void> pt = collection.parallelDo(
-        new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
-          @Override
-          public void process(Tuple3<V1, V2, V3> input, Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
-            emitter.emit(Pair.of(input, (Void) null));
-          }
-        }, type);
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>, Void>, Tuple3<V1, V2, V3>>() {
-      @Override
-      public void process(Pair<Tuple3<V1, V2, V3>, Void> input, Emitter<Tuple3<V1, V2, V3>> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
+    return sortTuples(collection, columnOrders);
   }
 
   /**
@@ -222,27 +328,7 @@ public class Sort {
    */
   public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
       PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
-    PTypeFamily tf = collection.getTypeFamily();
-    PType<Tuple4<V1, V2, V3, V4>> pType = collection.getPType();
-    @SuppressWarnings("unchecked")
-    PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(tf.quads(pType.getSubTypes().get(0), pType.getSubTypes()
-        .get(1), pType.getSubTypes().get(2), pType.getSubTypes().get(3)), tf.nulls());
-    PTable<Tuple4<V1, V2, V3, V4>, Void> pt = collection.parallelDo(
-        new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
-          @Override
-          public void process(Tuple4<V1, V2, V3, V4> input, Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
-            emitter.emit(Pair.of(input, (Void) null));
-          }
-        }, type);
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>, Void>, Tuple4<V1, V2, V3, V4>>() {
-      @Override
-      public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input, Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
+    return sortTuples(collection, columnOrders);
   }
 
   /**
@@ -251,25 +337,14 @@ public class Sort {
    * 
    * @return a {@link PCollection} representing the sorted collection.
    */
-  public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection, ColumnOrder... columnOrders) {
+  public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) {
     PTypeFamily tf = collection.getTypeFamily();
-    PType<TupleN> pType = collection.getPType();
-    PTableType<TupleN, Void> type = tf.tableOf(tf.tuples(pType.getSubTypes().toArray(new PType[0])), tf.nulls());
-    PTable<TupleN, Void> pt = collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
-      @Override
-      public void process(TupleN input, Emitter<Pair<TupleN, Void>> emitter) {
-        emitter.emit(Pair.of(input, (Void) null));
-      }
-    }, type);
+    PType<T> pType = collection.getPType();
+    KeyExtraction<T> ke = new KeyExtraction<T>(pType, columnOrders);
+    PTable<Object, T> pt = collection.by(ke.getByFn(), ke.getKeyType());
     Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<TupleN, Void>, TupleN>() {
-      @Override
-      public void process(Pair<TupleN, Void> input, Emitter<TupleN> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
+    GroupingOptions options = buildGroupingOptions(conf, tf, ke.getKeyType(), pType, columnOrders);
+    return pt.groupByKey(options).ungroup().values();
   }
 
   // TODO: move to type family?
@@ -294,15 +369,23 @@ public class Sort {
     return builder.build();
   }
 
-  private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> ptype,
-      ColumnOrder[] columnOrders) {
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> keyType,
+      PType<?> valueType, ColumnOrder[] columnOrders) {
     Builder builder = GroupingOptions.builder();
     if (tf == WritableTypeFamily.getInstance()) {
-      TupleWritableComparator.configureOrdering(conf, columnOrders);
-      builder.sortComparatorClass(TupleWritableComparator.class);
+      if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
+        builder.sortComparatorClass(ReverseWritableComparator.class);
+      } else {
+        TupleWritableComparator.configureOrdering(conf, columnOrders);
+        builder.sortComparatorClass(TupleWritableComparator.class);
+      }
     } else if (tf == AvroTypeFamily.getInstance()) {
-      TupleAvroComparator.configureOrdering(conf, columnOrders, ptype);
-      builder.sortComparatorClass(TupleAvroComparator.class);
+      if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
+        AvroType<T> avroType = (AvroType<T>) keyType;
+        Schema schema = avroType.getSchema();
+        conf.set("crunch.schema", schema.toString());
+        builder.sortComparatorClass(ReverseAvroComparator.class);
+      }
     } else {
       throw new RuntimeException("Unrecognized type family: " + tf);
     }
@@ -340,7 +423,7 @@ public class Sort {
 
   static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
 
-    Schema schema;
+    private Schema schema;
 
     @Override
     public void setConf(Configuration conf) {
@@ -397,12 +480,11 @@ public class Sort {
     public int compare(WritableComparable a, WritableComparable b) {
       TupleWritable ta = (TupleWritable) a;
       TupleWritable tb = (TupleWritable) b;
-      for (int i = 0; i < columnOrders.length; i++) {
-        int index = columnOrders[i].column - 1;
+      for (int index = 0; index < columnOrders.length; index++) {
         int order = 1;
-        if (columnOrders[i].order == Order.ASCENDING) {
+        if (columnOrders[index].order == Order.ASCENDING) {
           order = 1;
-        } else if (columnOrders[i].order == Order.DESCENDING) {
+        } else if (columnOrders[index].order == Order.DESCENDING) {
           order = -1;
         } else { // ignore
           continue;
@@ -431,7 +513,7 @@ public class Sort {
           }
         }
       }
-      return 0; // ordering using specified columns found no differences
+      return 0; // ordering using specified cols found no differences
     }
 
     @Override
@@ -457,69 +539,4 @@ public class Sort {
     }
   }
 
-  static class TupleAvroComparator<T> extends Configured implements RawComparator<T> {
-
-    Schema schema;
-
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
-      }
-    }
-
-    public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders, PType<S> ptype) {
-      Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders);
-      conf.set("crunch.schema", orderedSchema.toString());
-    }
-
-    // TODO: move to Avros
-    // TODO: need to re-order columns in map output then switch back in the
-    // reduce
-    // this will require more extensive changes in Crunch
-    private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
-      // Guarantee each tuple schema has a globally unique name
-      String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
-      Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-      List<Schema.Field> fields = Lists.newArrayList();
-      AvroType<S> parentAvroType = (AvroType<S>) ptype;
-      Schema parentAvroSchema = parentAvroType.getSchema();
-
-      BitSet orderedColumns = new BitSet();
-      // First add any fields specified by ColumnOrder
-      for (ColumnOrder columnOrder : orders) {
-        int index = columnOrder.column - 1;
-        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
-        Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
-        String fieldName = parentAvroSchema.getFields().get(index).name();
-        fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.valueOf(columnOrder.order
-            .name())));
-        orderedColumns.set(index);
-      }
-      // Then add remaining fields from the ptypes, with no sort order
-      for (int i = 0; i < ptype.getSubTypes().size(); i++) {
-        if (orderedColumns.get(i)) {
-          continue;
-        }
-        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i);
-        Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
-        String fieldName = parentAvroSchema.getFields().get(i).name();
-        fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.IGNORE));
-      }
-      schema.setFields(fields);
-      return schema;
-    }
-
-    @Override
-    public int compare(T o1, T o2) {
-      return ReflectData.get().compare(o1, o2, schema);
-    }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
-      return BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
-    }
-
-  }
 }