You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/10/03 06:42:06 UTC

[2/2] git commit: CRUNCH-78: Add a secondary sort example.

CRUNCH-78: Add a secondary sort example.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/167c9c28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/167c9c28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/167c9c28

Branch: refs/heads/master
Commit: 167c9c2818400103f1f0860bbf0d7f2e31a46c2b
Parents: e7b5feb
Author: Alex Kozlov <al...@cloudera.com>
Authored: Wed Sep 26 12:04:47 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Oct 2 20:18:36 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/examples/SecondarySort.java  |  177 +++++++++++++++
 1 files changed, 177 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/167c9c28/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
new file mode 100644
index 0000000..dc2f8b0
--- /dev/null
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Splitter;
+
+@SuppressWarnings("serial")
+public class SecondarySort extends Configured implements Tool, Serializable {
+
+  static enum COUNTERS {
+    CORRUPT_TIMESTAMP,
+    CORRUPT_LINE
+  }
+
+  // Input records are comma separated. The first field is grouping record. The
+  // second is the one to sort on (a long in this implementation). The rest is
+  // payload to be sorted.
+
+  // For example:
+
+  // one,1,1
+  // one,2,-3
+  // two,4,5
+  // two,2,6
+  // two,1,7,9
+  // three,0,-1
+  // one,-5,10
+  // one,-10,garbage
+
+  private static final char SPLIT_ON = ',';
+  private static final Splitter INPUT_SPLITTER = Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3);
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println();
+      System.err.println("Usage: " + this.getClass().getName()
+          + " [generic options] input output");
+      System.err.println();
+      GenericOptionsParser.printGenericCommandUsage(System.err);
+      return 1;
+    }
+    // Create an object to coordinate pipeline creation and execution.
+    Pipeline pipeline = new MRPipeline(SecondarySort.class, getConf());
+    // Reference a given text file as a collection of Strings.
+    PCollection<String> lines = pipeline.readTextFile(args[0]);
+
+    // Define a function that parses each line in a PCollection of Strings into
+    // a pair of pairs, the first of which will be grouped by (first member) and
+    // the sorted by (second memeber). The second pair is payload which can be
+    // passed in an Iterable object.
+    PTable<Pair<String, Long>, Pair<Long, String>> pairs = lines.parallelDo("extract_records",
+        new DoFn<String, Pair<Pair<String, Long>, Pair<Long, String>>>() {
+          @Override
+          public void process(String line, Emitter<Pair<Pair<String, Long>, Pair<Long, String>>> emitter) {
+            int i = 0;
+            String key = "";
+            long timestamp = 0;
+            String value = "";
+            for (String element : INPUT_SPLITTER.split(line)) {
+              switch (++i) {
+              case 1:
+                key = element;
+                break;
+              case 2:
+                try {
+                  timestamp = Long.parseLong(element);
+                } catch (NumberFormatException e) {
+                  System.out.println("Timestamp not in long format '" + line + "'");
+                  this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1);
+                }
+                break;
+              case 3:
+                value = element;
+                break;
+              default:
+                System.err.println("i = " + i + " should never happen!");
+                break;
+              }
+            }
+            if (i == 3) {
+              Long sortby = new Long(timestamp);
+              emitter.emit(new Pair<Pair<String, Long>, Pair<Long, String>>(new Pair<String, Long>(key, sortby),
+                  new Pair<Long, String>(sortby, value)));
+            } else {
+              this.getCounter(COUNTERS.CORRUPT_LINE).increment(1);
+            }
+          }}, Avros.tableOf(Avros.pairs(Avros.strings(), Avros.longs()), Avros.pairs(Avros.longs(), Avros.strings())));
+
+    // Define partitioning and grouping properties
+    GroupingOptions groupingOptions = GroupingOptions.builder()
+        .numReducers(this.getConf().getInt("mapred.reduce.tasks", 1))
+        .partitionerClass(JoinUtils.getPartitionerClass(AvroTypeFamily.getInstance()))
+        .groupingComparatorClass(JoinUtils.getGroupingComparator(AvroTypeFamily.getInstance())).build();
+
+    // Do the rest of the processing extracting a list of things according to
+    // groups defined in the groupingOptions
+
+    // The output of the above input will be (with one reducer):
+
+    // one : [[-10,garbage],[-5,10],[1,1],[2,-3]]
+    // three : [[0,-1]]
+    // two : [[1,7,9],[2,6],[4,5]]
+
+    pairs.groupByKey(groupingOptions)
+        .parallelDo("group_records",
+        new DoFn<Pair<Pair<String, Long>, Iterable<Pair<Long, String>>>, String>() {
+          final StringBuilder sb = new StringBuilder();
+          @Override
+          public void process(Pair<Pair<String, Long>, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+            sb.setLength(0);
+            sb.append(input.first().get(0));
+            sb.append(" : [");
+            boolean first = true;
+            for(Pair<Long, String> pair : input.second()) {
+              if (first) {
+                first = false;
+              } else {
+                sb.append(',');
+              }
+              sb.append(pair);
+            }
+            sb.append("]");
+            emitter.emit(sb.toString());
+          }
+        }, Writables.strings()).write(To.textFile(args[1]));
+
+    // Execute the pipeline as a MapReduce.
+    return pipeline.done().succeeded() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int exitCode = -1;
+    try {
+      exitCode = ToolRunner.run(new Configuration(), new SecondarySort(), args);
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+    System.exit(exitCode);
+  }
+}