You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/10/21 14:51:06 UTC
[3/3] git commit: Create a SecondarySort library function and update
the SS example to use it.
Create a SecondarySort library function and update the SS example to use it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/521cea25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/521cea25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/521cea25
Branch: refs/heads/master
Commit: 521cea2573b680c849f70e13e615e715dd2cdad0
Parents: 298fbaa
Author: Josh Wills <jw...@apache.org>
Authored: Sat Oct 13 19:33:03 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Oct 21 05:07:54 2012 -0700
----------------------------------------------------------------------
.../org/apache/crunch/examples/SecondarySort.java | 32 +--
.../java/org/apache/crunch/lib/SecondarySort.java | 166 +++++++++++++++
2 files changed, 175 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/521cea25/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
index dc2f8b0..3e08046 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
@@ -21,15 +21,12 @@ import java.io.Serializable;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
-import org.apache.crunch.GroupingOptions;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.To;
-import org.apache.crunch.lib.join.JoinUtils;
-import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
@@ -85,10 +82,10 @@ public class SecondarySort extends Configured implements Tool, Serializable {
// a pair of pairs, the first of which will be grouped by (first member) and
// the sorted by (second memeber). The second pair is payload which can be
// passed in an Iterable object.
- PTable<Pair<String, Long>, Pair<Long, String>> pairs = lines.parallelDo("extract_records",
- new DoFn<String, Pair<Pair<String, Long>, Pair<Long, String>>>() {
+ PTable<String, Pair<Long, String>> pairs = lines.parallelDo("extract_records",
+ new DoFn<String, Pair<String, Pair<Long, String>>>() {
@Override
- public void process(String line, Emitter<Pair<Pair<String, Long>, Pair<Long, String>>> emitter) {
+ public void process(String line, Emitter<Pair<String, Pair<Long, String>>> emitter) {
int i = 0;
String key = "";
long timestamp = 0;
@@ -116,21 +113,11 @@ public class SecondarySort extends Configured implements Tool, Serializable {
}
if (i == 3) {
Long sortby = new Long(timestamp);
- emitter.emit(new Pair<Pair<String, Long>, Pair<Long, String>>(new Pair<String, Long>(key, sortby),
- new Pair<Long, String>(sortby, value)));
+ emitter.emit(Pair.of(key, Pair.of(sortby, value)));
} else {
this.getCounter(COUNTERS.CORRUPT_LINE).increment(1);
}
- }}, Avros.tableOf(Avros.pairs(Avros.strings(), Avros.longs()), Avros.pairs(Avros.longs(), Avros.strings())));
-
- // Define partitioning and grouping properties
- GroupingOptions groupingOptions = GroupingOptions.builder()
- .numReducers(this.getConf().getInt("mapred.reduce.tasks", 1))
- .partitionerClass(JoinUtils.getPartitionerClass(AvroTypeFamily.getInstance()))
- .groupingComparatorClass(JoinUtils.getGroupingComparator(AvroTypeFamily.getInstance())).build();
-
- // Do the rest of the processing extracting a list of things according to
- // groups defined in the groupingOptions
+ }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), Avros.strings())));
// The output of the above input will be (with one reducer):
@@ -138,14 +125,13 @@ public class SecondarySort extends Configured implements Tool, Serializable {
// three : [[0,-1]]
// two : [[1,7,9],[2,6],[4,5]]
- pairs.groupByKey(groupingOptions)
- .parallelDo("group_records",
- new DoFn<Pair<Pair<String, Long>, Iterable<Pair<Long, String>>>, String>() {
+ org.apache.crunch.lib.SecondarySort.sortAndApply(pairs,
+ new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() {
final StringBuilder sb = new StringBuilder();
@Override
- public void process(Pair<Pair<String, Long>, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+ public void process(Pair<String, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
sb.setLength(0);
- sb.append(input.first().get(0));
+ sb.append(input.first());
sb.append(" : [");
boolean first = true;
for(Pair<Long, String> pair : input.second()) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/521cea25/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
new file mode 100644
index 0000000..5a826fd
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Utilities for performing a secondary sort on a PTable<K, Pair<V1, V2>> instance, i.e., sort on the
+ * key and then sort the values by V1.
+ */
+public class SecondarySort {
+
+ public static <K, V1, V2> PTable<K, Collection<Pair<V1, V2>>> sort(PTable<K, Pair<V1, V2>> input) {
+ PTypeFamily ptf = input.getTypeFamily();
+ return sortAndApply(input, new SSUnpackFn<K, V1, V2>(),
+ ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType())));
+ }
+
+ public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input,
+ DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) {
+ PTypeFamily ptf = input.getTypeFamily();
+ PType<Pair<V1, V2>> valueType = input.getValueType();
+ PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
+ ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)),
+ valueType);
+ PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
+ ptf.collections(input.getValueType()));
+ return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
+ .groupByKey(
+ GroupingOptions.builder()
+ .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
+ .partitionerClass(JoinUtils.getPartitionerClass(ptf))
+ .build())
+ .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
+ }
+
+ public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input,
+ DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) {
+ PTypeFamily ptf = input.getTypeFamily();
+ PType<Pair<V1, V2>> valueType = input.getValueType();
+ PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
+ ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)),
+ valueType);
+ PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
+ ptf.collections(input.getValueType()));
+ return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
+ .groupByKey(
+ GroupingOptions.builder()
+ .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
+ .partitionerClass(JoinUtils.getPartitionerClass(ptf))
+ .build())
+ .parallelDo("SecondarySort.apply", new SSTableWrapFn<K, V1, V2, U, V>(doFn), ptype);
+ }
+
+ private static class SSWrapFn<K, V1, V2, T> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> {
+ private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern;
+
+ public SSWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern) {
+ this.intern = intern;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ intern.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ intern.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ intern.setContext(getContext());
+ }
+
+ @Override
+ public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<T> emitter) {
+ intern.process(Pair.of(input.first().first(), input.second()), emitter);
+ }
+
+ @Override
+ public void cleanup(Emitter<T> emitter) {
+ intern.cleanup(emitter);
+ }
+ }
+
+ private static class SSTableWrapFn<K, V1, V2, U, V> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, Pair<U, V>> {
+ private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern;
+
+ public SSTableWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern) {
+ this.intern = intern;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ intern.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ intern.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ intern.setContext(getContext());
+ }
+
+ @Override
+ public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<Pair<U, V>> emitter) {
+ intern.process(Pair.of(input.first().first(), input.second()), emitter);
+ }
+
+ @Override
+ public void cleanup(Emitter<Pair<U, V>> emitter) {
+ intern.cleanup(emitter);
+ }
+ }
+
+ private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
+ @Override
+ public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
+ return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
+ }
+ }
+
+ private static class SSUnpackFn<K, V1, V2> extends
+ MapFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<K, Collection<Pair<V1, V2>>>> {
+ @Override
+ public Pair<K, Collection<Pair<V1, V2>>> map(Pair<K, Iterable<Pair<V1, V2>>> input) {
+ Collection<Pair<V1, V2>> c = ImmutableList.copyOf(input.second());
+ return Pair.of(input.first(), c);
+ }
+ }
+}