You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/02/24 22:39:13 UTC
git commit: CRUNCH-167: Re-write the sorting strategy for tuples to
only select the fields that we are using to sort on as the keys,
and re-implement the wrapper functions to compress all of the different tuple
sort methods (Pairs, Trips, etc.) into a si
Updated Branches:
refs/heads/master 6cc8d8ec8 -> 855b0ddcb
CRUNCH-167: Re-write the sorting strategy for tuples to only select the fields that
we are using to sort on as the keys, and re-implement the wrapper functions to
compress all of the different tuple sort methods (Pairs, Trips, etc.) into a single
method.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/855b0ddc
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/855b0ddc
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/855b0ddc
Branch: refs/heads/master
Commit: 855b0ddcb738611abfd9bed445403f18b63d2453
Parents: 6cc8d8e
Author: Josh Wills <jw...@apache.org>
Authored: Thu Feb 21 16:35:49 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Feb 24 13:02:55 2013 -0800
----------------------------------------------------------------------
.../java/org/apache/crunch/lib/SortByValueIT.java | 81 ++++
.../src/it/java/org/apache/crunch/lib/SortIT.java | 16 +-
crunch/src/it/resources/sort_by_value.txt | 5 +
.../src/main/java/org/apache/crunch/lib/Sort.java | 353 ++++++++-------
4 files changed, 278 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
new file mode 100644
index 0000000..c313351
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ */
+public class SortByValueIT {
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ private static class SplitFn extends MapFn<String, Pair<String, Long>> {
+ private String sep;
+
+ public SplitFn(String sep) {
+ this.sep = sep;
+ }
+
+ @Override
+ public Pair<String, Long> map(String input) {
+ String[] pieces = input.split(sep);
+ return Pair.of(pieces[0], Long.valueOf(pieces[1]));
+ }
+ }
+
+ @Test
+ public void testSortByValueWritables() throws Exception {
+ run(new MRPipeline(SortByValueIT.class), WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testSortByValueAvro() throws Exception {
+ run(new MRPipeline(SortByValueIT.class), AvroTypeFamily.getInstance());
+ }
+
+ public void run(Pipeline pipeline, PTypeFamily ptf) throws Exception {
+ String sbv = tmpDir.copyResourceFileName("sort_by_value.txt");
+ PTable<String, Long> letterCounts = pipeline.read(From.textFile(sbv)).parallelDo(new SplitFn("\t"),
+ ptf.tableOf(ptf.strings(), ptf.longs()));
+ PCollection<Pair<String, Long>> sorted = Sort.sortPairs(letterCounts, new ColumnOrder(2, Order.DESCENDING));
+ assertEquals(
+ ImmutableList.of(Pair.of("C", 3L), Pair.of("A", 2L), Pair.of("D", 2L), Pair.of("B", 1L), Pair.of("E", 1L)),
+ ImmutableList.copyOf(sorted.materialize()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
index 3ea31ca..bad4864 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -49,7 +49,6 @@ import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -78,7 +77,7 @@ public class SortIT implements Serializable {
}
@Test
- public void testWritableSortSecondDescFirstDesc() throws Exception {
+ public void testWritableSortSecondDescFirstAsc() throws Exception {
runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
"this doc has this text");
}
@@ -117,14 +116,13 @@ public class SortIT implements Serializable {
}
@Test
- public void testAvroSortPairAscAsc() throws Exception {
+ public void testAvroSortPairAscDesc() throws Exception {
runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
"this doc has this text");
}
@Test
- @Ignore("Avro sorting only works in field order at the moment")
- public void testAvroSortPairSecondAscFirstDesc() throws Exception {
+ public void testAvroSortPairSecondDescFirstAsc() throws Exception {
runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
"this doc has this text");
}
@@ -220,15 +218,15 @@ public class SortIT implements Serializable {
String inputPath = tmpDir.copyResourceFileName("docs.txt");
PCollection<String> input = pipeline.readTextFile(inputPath);
- PCollection<Pair<String, String>> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+ PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
@Override
public void process(String input, Emitter<Pair<String, String>> emitter) {
String[] split = input.split("[\t]+");
emitter.emit(Pair.of(split[0], split[1]));
}
- }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
+ }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
- Iterable<Pair<String, String>> lines = sorted.materialize();
+ List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize());
Pair<String, String> l = lines.iterator().next();
assertEquals(firstField, l.first());
assertEquals(secondField, l.second());
@@ -250,7 +248,7 @@ public class SortIT implements Serializable {
}
}, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
- Iterable<Tuple3<String, String, String>> lines = sorted.materialize();
+ List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize());
Tuple3<String, String, String> l = lines.iterator().next();
assertEquals(firstField, l.first());
assertEquals(secondField, l.second());
http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/resources/sort_by_value.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/sort_by_value.txt b/crunch/src/it/resources/sort_by_value.txt
new file mode 100644
index 0000000..73f7d11
--- /dev/null
+++ b/crunch/src/it/resources/sort_by_value.txt
@@ -0,0 +1,5 @@
+A 2
+B 1
+C 3
+D 2
+E 1
http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sort.java b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
index f2729a2..cca5a79 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
@@ -18,29 +18,33 @@
package org.apache.crunch.lib;
import java.util.Arrays;
-import java.util.BitSet;
import java.util.List;
import java.util.UUID;
import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryData;
import org.apache.avro.reflect.ReflectData;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.GroupingOptions.Builder;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleFactory;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.TupleWritable;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.hadoop.conf.Configurable;
@@ -54,7 +58,6 @@ import org.apache.hadoop.mapred.JobConf;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -120,13 +123,7 @@ public class Sort {
emitter.emit(Pair.of(input, (Void) null));
}
}, type);
- PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup();
- return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() {
- @Override
- public void process(Pair<T, Void> input, Emitter<T> emitter) {
- emitter.emit(input.first());
- }
- }, collection.getPType());
+ return pt.groupByKey(options).ungroup().keys();
}
/**
@@ -151,6 +148,156 @@ public class Sort {
return table.groupByKey(options).ungroup();
}
+ static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
+ private final int index;
+
+ public SingleKeyFn(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public K map(V input) {
+ return (K) input.get(index);
+ }
+ }
+
+ static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
+ private final int[] indices;
+ private final TupleFactory tupleFactory;
+
+ public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
+ this.indices = indices;
+ this.tupleFactory = tupleFactory;
+ }
+
+ @Override
+ public K map(V input) {
+ Object[] values = new Object[indices.length];
+ for (int i = 0; i < indices.length; i++) {
+ values[i] = input.get(indices[i]);
+ }
+ return (K) tupleFactory.makeTuple(values);
+ }
+ }
+
+ static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
+
+ private final int[] indices;
+ private final String schemaJson;
+ private transient Schema schema;
+
+ public AvroGenericFn(int[] indices, Schema schema) {
+ this.indices = indices;
+ this.schemaJson = schema.toString();
+ }
+
+ @Override
+ public void initialize() {
+ this.schema = (new Schema.Parser()).parse(schemaJson);
+ }
+
+ @Override
+ public GenericRecord map(V input) {
+ GenericRecord rec = new GenericData.Record(schema);
+ for (int i = 0; i < indices.length; i++) {
+ rec.put(i, input.get(indices[i]));
+ }
+ return rec;
+ }
+ }
+
+ static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+ // Guarantee each tuple schema has a globally unique name
+ String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+ Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+ List<Schema.Field> fields = Lists.newArrayList();
+ AvroType<S> parentAvroType = (AvroType<S>) ptype;
+ Schema parentAvroSchema = parentAvroType.getSchema();
+
+ for (int index = 0; index < orders.length; index++) {
+ ColumnOrder columnOrder = orders[index];
+ AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+ Schema fieldSchema = atype.getSchema();
+ String fieldName = parentAvroSchema.getFields().get(index).name();
+ // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
+ // Text instances: making this consistent
+ Schema.Field.Order order = columnOrder.order == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
+ Schema.Field.Order.ASCENDING;
+ fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
+ }
+ schema.setFields(fields);
+ return schema;
+ }
+
+ static class KeyExtraction<V extends Tuple> {
+
+ private PType<V> ptype;
+ private final ColumnOrder[] columnOrder;
+ private final int[] cols;
+
+ private MapFn<V, Object> byFn;
+ private PType<Object> keyPType;
+
+ public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
+ this.ptype = ptype;
+ this.columnOrder = columnOrder;
+ this.cols = new int[columnOrder.length];
+ for (int i = 0; i < columnOrder.length; i++) {
+ cols[i] = columnOrder[i].column - 1;
+ }
+ init();
+ }
+
+ private void init() {
+ List<PType> pt = ptype.getSubTypes();
+ PTypeFamily ptf = ptype.getFamily();
+ if (cols.length == 1) {
+ byFn = new SingleKeyFn(cols[0]);
+ keyPType = pt.get(cols[0]);
+ } else {
+ TupleFactory tf = null;
+ switch (cols.length) {
+ case 2:
+ tf = TupleFactory.PAIR;
+ keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
+ break;
+ case 3:
+ tf = TupleFactory.TUPLE3;
+ keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
+ break;
+ case 4:
+ tf = TupleFactory.TUPLE4;
+ keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
+ break;
+ default:
+ PType[] pts = new PType[cols.length];
+ for (int i = 0; i < pts.length; i++) {
+ pts[i] = pt.get(cols[i]);
+ }
+ tf = TupleFactory.TUPLEN;
+ keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
+ }
+
+ if (ptf == AvroTypeFamily.getInstance()) {
+ Schema s = createOrderedTupleSchema(keyPType, columnOrder);
+ keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
+ byFn = new AvroGenericFn(cols, s);
+ } else {
+ byFn = new TupleKeyFn(cols, tf);
+ }
+ }
+
+ }
+
+ public MapFn<V, Object> getByFn() {
+ return byFn;
+ }
+
+ public PType<Object> getKeyType() {
+ return keyPType;
+ }
+ }
+
/**
* Sorts the {@link PCollection} of {@link Pair}s using the specified column
* ordering.
@@ -159,28 +306,7 @@ public class Sort {
*/
public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection,
ColumnOrder... columnOrders) {
- // put U and V into a pair/tuple in the key so we can do grouping and
- // sorting
- PTypeFamily tf = collection.getTypeFamily();
- PType<Pair<U, V>> pType = collection.getPType();
- @SuppressWarnings("unchecked")
- PTableType<Pair<U, V>, Void> type = tf.tableOf(tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)),
- tf.nulls());
- PTable<Pair<U, V>, Void> pt = collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
- @Override
- public void process(Pair<U, V> input, Emitter<Pair<Pair<U, V>, Void>> emitter) {
- emitter.emit(Pair.of(input, (Void) null));
- }
- }, type);
- Configuration conf = collection.getPipeline().getConfiguration();
- GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
- PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup();
- return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>, Void>, Pair<U, V>>() {
- @Override
- public void process(Pair<Pair<U, V>, Void> input, Emitter<Pair<U, V>> emitter) {
- emitter.emit(input.first());
- }
- }, collection.getPType());
+ return sortTuples(collection, columnOrders);
}
/**
@@ -191,27 +317,7 @@ public class Sort {
*/
public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection,
ColumnOrder... columnOrders) {
- PTypeFamily tf = collection.getTypeFamily();
- PType<Tuple3<V1, V2, V3>> pType = collection.getPType();
- @SuppressWarnings("unchecked")
- PTableType<Tuple3<V1, V2, V3>, Void> type = tf.tableOf(
- tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)), tf.nulls());
- PTable<Tuple3<V1, V2, V3>, Void> pt = collection.parallelDo(
- new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
- @Override
- public void process(Tuple3<V1, V2, V3> input, Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
- emitter.emit(Pair.of(input, (Void) null));
- }
- }, type);
- Configuration conf = collection.getPipeline().getConfiguration();
- GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
- PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup();
- return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>, Void>, Tuple3<V1, V2, V3>>() {
- @Override
- public void process(Pair<Tuple3<V1, V2, V3>, Void> input, Emitter<Tuple3<V1, V2, V3>> emitter) {
- emitter.emit(input.first());
- }
- }, collection.getPType());
+ return sortTuples(collection, columnOrders);
}
/**
@@ -222,27 +328,7 @@ public class Sort {
*/
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
- PTypeFamily tf = collection.getTypeFamily();
- PType<Tuple4<V1, V2, V3, V4>> pType = collection.getPType();
- @SuppressWarnings("unchecked")
- PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(tf.quads(pType.getSubTypes().get(0), pType.getSubTypes()
- .get(1), pType.getSubTypes().get(2), pType.getSubTypes().get(3)), tf.nulls());
- PTable<Tuple4<V1, V2, V3, V4>, Void> pt = collection.parallelDo(
- new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
- @Override
- public void process(Tuple4<V1, V2, V3, V4> input, Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
- emitter.emit(Pair.of(input, (Void) null));
- }
- }, type);
- Configuration conf = collection.getPipeline().getConfiguration();
- GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
- PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup();
- return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>, Void>, Tuple4<V1, V2, V3, V4>>() {
- @Override
- public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input, Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
- emitter.emit(input.first());
- }
- }, collection.getPType());
+ return sortTuples(collection, columnOrders);
}
/**
@@ -251,25 +337,14 @@ public class Sort {
*
* @return a {@link PCollection} representing the sorted collection.
*/
- public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection, ColumnOrder... columnOrders) {
+ public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) {
PTypeFamily tf = collection.getTypeFamily();
- PType<TupleN> pType = collection.getPType();
- PTableType<TupleN, Void> type = tf.tableOf(tf.tuples(pType.getSubTypes().toArray(new PType[0])), tf.nulls());
- PTable<TupleN, Void> pt = collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
- @Override
- public void process(TupleN input, Emitter<Pair<TupleN, Void>> emitter) {
- emitter.emit(Pair.of(input, (Void) null));
- }
- }, type);
+ PType<T> pType = collection.getPType();
+ KeyExtraction<T> ke = new KeyExtraction<T>(pType, columnOrders);
+ PTable<Object, T> pt = collection.by(ke.getByFn(), ke.getKeyType());
Configuration conf = collection.getPipeline().getConfiguration();
- GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
- PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup();
- return sortedPt.parallelDo(new DoFn<Pair<TupleN, Void>, TupleN>() {
- @Override
- public void process(Pair<TupleN, Void> input, Emitter<TupleN> emitter) {
- emitter.emit(input.first());
- }
- }, collection.getPType());
+ GroupingOptions options = buildGroupingOptions(conf, tf, ke.getKeyType(), pType, columnOrders);
+ return pt.groupByKey(options).ungroup().values();
}
// TODO: move to type family?
@@ -294,15 +369,23 @@ public class Sort {
return builder.build();
}
- private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> ptype,
- ColumnOrder[] columnOrders) {
+ private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> keyType,
+ PType<?> valueType, ColumnOrder[] columnOrders) {
Builder builder = GroupingOptions.builder();
if (tf == WritableTypeFamily.getInstance()) {
- TupleWritableComparator.configureOrdering(conf, columnOrders);
- builder.sortComparatorClass(TupleWritableComparator.class);
+ if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
+ builder.sortComparatorClass(ReverseWritableComparator.class);
+ } else {
+ TupleWritableComparator.configureOrdering(conf, columnOrders);
+ builder.sortComparatorClass(TupleWritableComparator.class);
+ }
} else if (tf == AvroTypeFamily.getInstance()) {
- TupleAvroComparator.configureOrdering(conf, columnOrders, ptype);
- builder.sortComparatorClass(TupleAvroComparator.class);
+ if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
+ AvroType<T> avroType = (AvroType<T>) keyType;
+ Schema schema = avroType.getSchema();
+ conf.set("crunch.schema", schema.toString());
+ builder.sortComparatorClass(ReverseAvroComparator.class);
+ }
} else {
throw new RuntimeException("Unrecognized type family: " + tf);
}
@@ -340,7 +423,7 @@ public class Sort {
static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
- Schema schema;
+ private Schema schema;
@Override
public void setConf(Configuration conf) {
@@ -397,12 +480,11 @@ public class Sort {
public int compare(WritableComparable a, WritableComparable b) {
TupleWritable ta = (TupleWritable) a;
TupleWritable tb = (TupleWritable) b;
- for (int i = 0; i < columnOrders.length; i++) {
- int index = columnOrders[i].column - 1;
+ for (int index = 0; index < columnOrders.length; index++) {
int order = 1;
- if (columnOrders[i].order == Order.ASCENDING) {
+ if (columnOrders[index].order == Order.ASCENDING) {
order = 1;
- } else if (columnOrders[i].order == Order.DESCENDING) {
+ } else if (columnOrders[index].order == Order.DESCENDING) {
order = -1;
} else { // ignore
continue;
@@ -431,7 +513,7 @@ public class Sort {
}
}
}
- return 0; // ordering using specified columns found no differences
+ return 0; // ordering using specified cols found no differences
}
@Override
@@ -457,69 +539,4 @@ public class Sort {
}
}
- static class TupleAvroComparator<T> extends Configured implements RawComparator<T> {
-
- Schema schema;
-
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- if (conf != null) {
- schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
- }
- }
-
- public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders, PType<S> ptype) {
- Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders);
- conf.set("crunch.schema", orderedSchema.toString());
- }
-
- // TODO: move to Avros
- // TODO: need to re-order columns in map output then switch back in the
- // reduce
- // this will require more extensive changes in Crunch
- private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
- // Guarantee each tuple schema has a globally unique name
- String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
- Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
- List<Schema.Field> fields = Lists.newArrayList();
- AvroType<S> parentAvroType = (AvroType<S>) ptype;
- Schema parentAvroSchema = parentAvroType.getSchema();
-
- BitSet orderedColumns = new BitSet();
- // First add any fields specified by ColumnOrder
- for (ColumnOrder columnOrder : orders) {
- int index = columnOrder.column - 1;
- AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
- Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
- String fieldName = parentAvroSchema.getFields().get(index).name();
- fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.valueOf(columnOrder.order
- .name())));
- orderedColumns.set(index);
- }
- // Then add remaining fields from the ptypes, with no sort order
- for (int i = 0; i < ptype.getSubTypes().size(); i++) {
- if (orderedColumns.get(i)) {
- continue;
- }
- AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i);
- Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
- String fieldName = parentAvroSchema.getFields().get(i).name();
- fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.IGNORE));
- }
- schema.setFields(fields);
- return schema;
- }
-
- @Override
- public int compare(T o1, T o2) {
- return ReflectData.get().compare(o1, o2, schema);
- }
-
- @Override
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
- return BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
- }
-
- }
}