You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/10/21 14:51:06 UTC

[3/3] git commit: Create a SecondarySort library function and update the SS example to use it.

Create a SecondarySort library function and update the SS example to use it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/521cea25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/521cea25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/521cea25

Branch: refs/heads/master
Commit: 521cea2573b680c849f70e13e615e715dd2cdad0
Parents: 298fbaa
Author: Josh Wills <jw...@apache.org>
Authored: Sat Oct 13 19:33:03 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Oct 21 05:07:54 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/examples/SecondarySort.java  |   32 +--
 .../java/org/apache/crunch/lib/SecondarySort.java  |  166 +++++++++++++++
 2 files changed, 175 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/521cea25/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
index dc2f8b0..3e08046 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
@@ -21,15 +21,12 @@ import java.io.Serializable;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
-import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.To;
-import org.apache.crunch.lib.join.JoinUtils;
-import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -85,10 +82,10 @@ public class SecondarySort extends Configured implements Tool, Serializable {
     // a pair of pairs, the first of which will be grouped by (first member) and
     // the sorted by (second memeber). The second pair is payload which can be
     // passed in an Iterable object.
-    PTable<Pair<String, Long>, Pair<Long, String>> pairs = lines.parallelDo("extract_records",
-        new DoFn<String, Pair<Pair<String, Long>, Pair<Long, String>>>() {
+    PTable<String, Pair<Long, String>> pairs = lines.parallelDo("extract_records",
+        new DoFn<String, Pair<String, Pair<Long, String>>>() {
           @Override
-          public void process(String line, Emitter<Pair<Pair<String, Long>, Pair<Long, String>>> emitter) {
+          public void process(String line, Emitter<Pair<String, Pair<Long, String>>> emitter) {
             int i = 0;
             String key = "";
             long timestamp = 0;
@@ -116,21 +113,11 @@ public class SecondarySort extends Configured implements Tool, Serializable {
             }
             if (i == 3) {
               Long sortby = new Long(timestamp);
-              emitter.emit(new Pair<Pair<String, Long>, Pair<Long, String>>(new Pair<String, Long>(key, sortby),
-                  new Pair<Long, String>(sortby, value)));
+              emitter.emit(Pair.of(key, Pair.of(sortby, value)));
             } else {
               this.getCounter(COUNTERS.CORRUPT_LINE).increment(1);
             }
-          }}, Avros.tableOf(Avros.pairs(Avros.strings(), Avros.longs()), Avros.pairs(Avros.longs(), Avros.strings())));
-
-    // Define partitioning and grouping properties
-    GroupingOptions groupingOptions = GroupingOptions.builder()
-        .numReducers(this.getConf().getInt("mapred.reduce.tasks", 1))
-        .partitionerClass(JoinUtils.getPartitionerClass(AvroTypeFamily.getInstance()))
-        .groupingComparatorClass(JoinUtils.getGroupingComparator(AvroTypeFamily.getInstance())).build();
-
-    // Do the rest of the processing extracting a list of things according to
-    // groups defined in the groupingOptions
+          }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), Avros.strings())));
 
     // The output of the above input will be (with one reducer):
 
@@ -138,14 +125,13 @@ public class SecondarySort extends Configured implements Tool, Serializable {
     // three : [[0,-1]]
     // two : [[1,7,9],[2,6],[4,5]]
 
-    pairs.groupByKey(groupingOptions)
-        .parallelDo("group_records",
-        new DoFn<Pair<Pair<String, Long>, Iterable<Pair<Long, String>>>, String>() {
+    org.apache.crunch.lib.SecondarySort.sortAndApply(pairs,
+        new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() {
           final StringBuilder sb = new StringBuilder();
           @Override
-          public void process(Pair<Pair<String, Long>, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+          public void process(Pair<String, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
             sb.setLength(0);
-            sb.append(input.first().get(0));
+            sb.append(input.first());
             sb.append(" : [");
             boolean first = true;
             for(Pair<Long, String> pair : input.second()) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/521cea25/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
new file mode 100644
index 0000000..5a826fd
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Utilities for performing a secondary sort on a PTable<K, Pair<V1, V2>> instance, i.e., sort on the
+ * key and then sort the values by V1.
+ */
+public class SecondarySort {
+
+  public static <K, V1, V2> PTable<K, Collection<Pair<V1, V2>>> sort(PTable<K, Pair<V1, V2>> input) {
+    PTypeFamily ptf = input.getTypeFamily();
+    return sortAndApply(input, new SSUnpackFn<K, V1, V2>(),
+        ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType())));
+  }
+  
+  public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PType<Pair<V1, V2>> valueType = input.getValueType();
+    PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
+        ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)),
+        valueType);
+    PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
+        ptf.collections(input.getValueType()));
+    return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
+        .groupByKey(
+            GroupingOptions.builder()
+            .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
+            .partitionerClass(JoinUtils.getPartitionerClass(ptf))
+            .build())
+        .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
+  }
+  
+  public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input,
+      DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PType<Pair<V1, V2>> valueType = input.getValueType();
+    PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
+        ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)),
+        valueType);
+    PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
+        ptf.collections(input.getValueType()));
+    return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
+        .groupByKey(
+            GroupingOptions.builder()
+            .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
+            .partitionerClass(JoinUtils.getPartitionerClass(ptf))
+            .build())
+        .parallelDo("SecondarySort.apply", new SSTableWrapFn<K, V1, V2, U, V>(doFn), ptype);
+  }
+  
+  private static class SSWrapFn<K, V1, V2, T> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> {
+    private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern;
+    
+    public SSWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern) {
+      this.intern = intern;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      intern.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      intern.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      intern.setContext(getContext());
+    }
+    
+    @Override
+    public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<T> emitter) {
+      intern.process(Pair.of(input.first().first(), input.second()), emitter);
+    }
+    
+    @Override
+    public void cleanup(Emitter<T> emitter) {
+      intern.cleanup(emitter);
+    }
+  }
+  
+  private static class SSTableWrapFn<K, V1, V2, U, V> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, Pair<U, V>> {
+    private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern;
+    
+    public SSTableWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern) {
+      this.intern = intern;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      intern.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      intern.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      intern.setContext(getContext());
+    }
+    
+    @Override
+    public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<Pair<U, V>> emitter) {
+      intern.process(Pair.of(input.first().first(), input.second()), emitter);
+    }
+    
+    @Override
+    public void cleanup(Emitter<Pair<U, V>> emitter) {
+      intern.cleanup(emitter);
+    }
+  }
+  
+  private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
+    @Override
+    public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
+      return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
+    }
+  }
+  
+  private static class SSUnpackFn<K, V1, V2> extends
+      MapFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<K, Collection<Pair<V1, V2>>>> {
+    @Override
+    public Pair<K, Collection<Pair<V1, V2>>> map(Pair<K, Iterable<Pair<V1, V2>>> input) {
+      Collection<Pair<V1, V2>> c = ImmutableList.copyOf(input.second());
+      return Pair.of(input.first(), c);
+    }
+  }
+}