You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/10/03 06:42:06 UTC
[2/2] git commit: CRUNCH-78: Add a secondary sort example.
CRUNCH-78: Add a secondary sort example.
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/167c9c28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/167c9c28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/167c9c28
Branch: refs/heads/master
Commit: 167c9c2818400103f1f0860bbf0d7f2e31a46c2b
Parents: e7b5feb
Author: Alex Kozlov <al...@cloudera.com>
Authored: Wed Sep 26 12:04:47 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Oct 2 20:18:36 2012 -0700
----------------------------------------------------------------------
.../org/apache/crunch/examples/SecondarySort.java | 177 +++++++++++++++
1 files changed, 177 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/167c9c28/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
new file mode 100644
index 0000000..dc2f8b0
--- /dev/null
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Splitter;
+
+@SuppressWarnings("serial")
+public class SecondarySort extends Configured implements Tool, Serializable {
+
+ static enum COUNTERS {
+ CORRUPT_TIMESTAMP,
+ CORRUPT_LINE
+ }
+
+ // Input records are comma separated. The first field is grouping record. The
+ // second is the one to sort on (a long in this implementation). The rest is
+ // payload to be sorted.
+
+ // For example:
+
+ // one,1,1
+ // one,2,-3
+ // two,4,5
+ // two,2,6
+ // two,1,7,9
+ // three,0,-1
+ // one,-5,10
+ // one,-10,garbage
+
+ private static final char SPLIT_ON = ',';
+ private static final Splitter INPUT_SPLITTER = Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.err.println();
+ System.err.println("Usage: " + this.getClass().getName()
+ + " [generic options] input output");
+ System.err.println();
+ GenericOptionsParser.printGenericCommandUsage(System.err);
+ return 1;
+ }
+ // Create an object to coordinate pipeline creation and execution.
+ Pipeline pipeline = new MRPipeline(SecondarySort.class, getConf());
+ // Reference a given text file as a collection of Strings.
+ PCollection<String> lines = pipeline.readTextFile(args[0]);
+
+ // Define a function that parses each line in a PCollection of Strings into
+ // a pair of pairs, the first of which will be grouped by (first member) and
+ // the sorted by (second memeber). The second pair is payload which can be
+ // passed in an Iterable object.
+ PTable<Pair<String, Long>, Pair<Long, String>> pairs = lines.parallelDo("extract_records",
+ new DoFn<String, Pair<Pair<String, Long>, Pair<Long, String>>>() {
+ @Override
+ public void process(String line, Emitter<Pair<Pair<String, Long>, Pair<Long, String>>> emitter) {
+ int i = 0;
+ String key = "";
+ long timestamp = 0;
+ String value = "";
+ for (String element : INPUT_SPLITTER.split(line)) {
+ switch (++i) {
+ case 1:
+ key = element;
+ break;
+ case 2:
+ try {
+ timestamp = Long.parseLong(element);
+ } catch (NumberFormatException e) {
+ System.out.println("Timestamp not in long format '" + line + "'");
+ this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1);
+ }
+ break;
+ case 3:
+ value = element;
+ break;
+ default:
+ System.err.println("i = " + i + " should never happen!");
+ break;
+ }
+ }
+ if (i == 3) {
+ Long sortby = new Long(timestamp);
+ emitter.emit(new Pair<Pair<String, Long>, Pair<Long, String>>(new Pair<String, Long>(key, sortby),
+ new Pair<Long, String>(sortby, value)));
+ } else {
+ this.getCounter(COUNTERS.CORRUPT_LINE).increment(1);
+ }
+ }}, Avros.tableOf(Avros.pairs(Avros.strings(), Avros.longs()), Avros.pairs(Avros.longs(), Avros.strings())));
+
+ // Define partitioning and grouping properties
+ GroupingOptions groupingOptions = GroupingOptions.builder()
+ .numReducers(this.getConf().getInt("mapred.reduce.tasks", 1))
+ .partitionerClass(JoinUtils.getPartitionerClass(AvroTypeFamily.getInstance()))
+ .groupingComparatorClass(JoinUtils.getGroupingComparator(AvroTypeFamily.getInstance())).build();
+
+ // Do the rest of the processing extracting a list of things according to
+ // groups defined in the groupingOptions
+
+ // The output of the above input will be (with one reducer):
+
+ // one : [[-10,garbage],[-5,10],[1,1],[2,-3]]
+ // three : [[0,-1]]
+ // two : [[1,7,9],[2,6],[4,5]]
+
+ pairs.groupByKey(groupingOptions)
+ .parallelDo("group_records",
+ new DoFn<Pair<Pair<String, Long>, Iterable<Pair<Long, String>>>, String>() {
+ final StringBuilder sb = new StringBuilder();
+ @Override
+ public void process(Pair<Pair<String, Long>, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+ sb.setLength(0);
+ sb.append(input.first().get(0));
+ sb.append(" : [");
+ boolean first = true;
+ for(Pair<Long, String> pair : input.second()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(',');
+ }
+ sb.append(pair);
+ }
+ sb.append("]");
+ emitter.emit(sb.toString());
+ }
+ }, Writables.strings()).write(To.textFile(args[1]));
+
+ // Execute the pipeline as a MapReduce.
+ return pipeline.done().succeeded() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = -1;
+ try {
+ exitCode = ToolRunner.run(new Configuration(), new SecondarySort(), args);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ System.exit(exitCode);
+ }
+}