You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/10/21 14:51:06 UTC
[2/3] git commit: Secondary sort integration tests
Secondary sort integration tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/db0ce8e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/db0ce8e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/db0ce8e8
Branch: refs/heads/master
Commit: db0ce8e8985c3d0d686898aa4182bdf1c004d7b5
Parents: 521cea2
Author: Josh Wills <jw...@apache.org>
Authored: Sun Oct 14 11:40:28 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Oct 21 05:07:55 2012 -0700
----------------------------------------------------------------------
.../org/apache/crunch/lib/SecondarySortIT.java | 65 +++++++++
crunch/src/it/resources/secondary_sort_input.txt | 7 +
.../java/org/apache/crunch/lib/SecondarySort.java | 101 ++++-----------
3 files changed, 98 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
new file mode 100644
index 0000000..242f621
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+
+public class SecondarySortIT extends CrunchTestSupport implements Serializable {
+
+ @Test
+ public void testSecondarySort() throws Exception {
+ Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration());
+ String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
+
+ PTable<String, Pair<Integer, Integer>> in = p.read(From.textFile(inputFile))
+ .parallelDo(new MapFn<String, Pair<String, Pair<Integer, Integer>>>() {
+ @Override
+ public Pair<String, Pair<Integer, Integer>> map(String input) {
+ String[] pieces = input.split(",");
+ return Pair.of(pieces[0],
+ Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
+ }
+ }, tableOf(strings(), pairs(ints(), ints())));
+ Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
+ @Override
+ public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
+ Joiner j = Joiner.on(',');
+ return j.join(input.first(), j.join(input.second()));
+ }
+ }, strings()).materialize();
+ assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
+ ImmutableList.copyOf(lines));
+ p.done();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/it/resources/secondary_sort_input.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/secondary_sort_input.txt b/crunch/src/it/resources/secondary_sort_input.txt
new file mode 100644
index 0000000..3c7be93
--- /dev/null
+++ b/crunch/src/it/resources/secondary_sort_input.txt
@@ -0,0 +1,7 @@
+one,1,1
+one,2,-3
+two,4,5
+two,2,6
+two,1,7,9
+three,0,-1
+one,-5,10
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
index 5a826fd..ebf7fb4 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -24,6 +24,7 @@ import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.lib.join.JoinUtils;
@@ -32,40 +33,33 @@ import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;
-import com.google.common.collect.ImmutableList;
-
/**
- * Utilities for performing a secondary sort on a PTable<K, Pair<V1, V2>> instance, i.e., sort on the
- * key and then sort the values by V1.
+ * Utilities for performing a secondary sort on a {@code PTable<K, Pair<V1, V2>>} collection.
*/
public class SecondarySort {
-
- public static <K, V1, V2> PTable<K, Collection<Pair<V1, V2>>> sort(PTable<K, Pair<V1, V2>> input) {
- PTypeFamily ptf = input.getTypeFamily();
- return sortAndApply(input, new SSUnpackFn<K, V1, V2>(),
- ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType())));
- }
+ /**
+ * Perform a secondary sort on the given {@code PTable} instance and then apply a
+ * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>}.
+ */
public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input,
DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) {
- PTypeFamily ptf = input.getTypeFamily();
- PType<Pair<V1, V2>> valueType = input.getValueType();
- PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
- ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)),
- valueType);
- PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
- ptf.collections(input.getValueType()));
- return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
- .groupByKey(
- GroupingOptions.builder()
- .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
- .partitionerClass(JoinUtils.getPartitionerClass(ptf))
- .build())
+ return prepare(input)
.parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
}
+ /**
+ * Perform a secondary sort on the given {@code PTable} instance and then apply a
+ * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>}.
+ */
public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input,
DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) {
+ return prepare(input)
+ .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, Pair<U, V>>(doFn), ptype);
+ }
+
+ private static <K, V1, V2> PGroupedTable<Pair<K, V1>, Pair<V1, V2>> prepare(
+ PTable<K, Pair<V1, V2>> input) {
PTypeFamily ptf = input.getTypeFamily();
PType<Pair<V1, V2>> valueType = input.getValueType();
PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
@@ -78,10 +72,16 @@ public class SecondarySort {
GroupingOptions.builder()
.groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
.partitionerClass(JoinUtils.getPartitionerClass(ptf))
- .build())
- .parallelDo("SecondarySort.apply", new SSTableWrapFn<K, V1, V2, U, V>(doFn), ptype);
+ .build());
}
+ private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
+ @Override
+ public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
+ return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
+ }
+ }
+
private static class SSWrapFn<K, V1, V2, T> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> {
private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern;
@@ -113,54 +113,5 @@ public class SecondarySort {
public void cleanup(Emitter<T> emitter) {
intern.cleanup(emitter);
}
- }
-
- private static class SSTableWrapFn<K, V1, V2, U, V> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, Pair<U, V>> {
- private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern;
-
- public SSTableWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern) {
- this.intern = intern;
- }
-
- @Override
- public void configure(Configuration conf) {
- intern.configure(conf);
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- intern.setConfigurationForTest(conf);
- }
-
- @Override
- public void initialize() {
- intern.setContext(getContext());
- }
-
- @Override
- public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<Pair<U, V>> emitter) {
- intern.process(Pair.of(input.first().first(), input.second()), emitter);
- }
-
- @Override
- public void cleanup(Emitter<Pair<U, V>> emitter) {
- intern.cleanup(emitter);
- }
- }
-
- private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
- @Override
- public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
- return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
- }
- }
-
- private static class SSUnpackFn<K, V1, V2> extends
- MapFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<K, Collection<Pair<V1, V2>>>> {
- @Override
- public Pair<K, Collection<Pair<V1, V2>>> map(Pair<K, Iterable<Pair<V1, V2>>> input) {
- Collection<Pair<V1, V2>> c = ImmutableList.copyOf(input.second());
- return Pair.of(input.first(), c);
- }
- }
+ }
}