You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ma...@apache.org on 2012/09/20 07:51:57 UTC
git commit: CRUNCH-68: Fix command line parser for examples.
Updated Branches:
refs/heads/master a65feb569 -> 17ec15584
CRUNCH-68: Fix command line parser for examples.
Use GenericOptionsParser to deal with Hadoop options.
Make examples exit with appropriate status codes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/17ec1558
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/17ec1558
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/17ec1558
Branch: refs/heads/master
Commit: 17ec1558455f2ba7ab099da5f028b4b6b01adda0
Parents: a65feb5
Author: Matthias Friedrich <ma...@mafr.de>
Authored: Wed Sep 19 20:22:27 2012 +0200
Committer: Matthias Friedrich <ma...@mafr.de>
Committed: Thu Sep 20 07:29:50 2012 +0200
----------------------------------------------------------------------
.../apache/crunch/examples/AverageBytesByIP.java | 17 +++++++++-----
.../org/apache/crunch/examples/TotalBytesByIP.java | 17 +++++++++-----
.../java/org/apache/crunch/examples/WordCount.java | 17 +++++++++-----
3 files changed, 33 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
index 868e38a..52b542a 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
@@ -29,6 +29,7 @@ import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
@@ -47,7 +48,9 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
public int run(String[] args) throws Exception {
- if (args.length != 2) {
+ String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+
+ if (remainingArgs.length != 3) {
System.err.println();
System.err.println("Two and only two arguments are accepted.");
System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output");
@@ -58,7 +61,7 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf());
// Reference a given text file as a collection of Strings.
- PCollection<String> lines = pipeline.readTextFile(args[0]);
+ PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]);
// Combiner used for summing up response size and count
CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = CombineFn.pairAggregator(CombineFn.SUM_LONGS,
@@ -75,10 +78,11 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
Writables.tableOf(Writables.strings(), Writables.doubles()));
// write the result to a text file
- pipeline.writeTextFile(avgs, args[1]);
+ pipeline.writeTextFile(avgs, remainingArgs[2]);
// Execute the pipeline as a MapReduce.
- pipeline.done();
- return 0;
+ PipelineResult result = pipeline.done();
+
+ return result.succeeded() ? 0 : 1;
}
// Function to calculate the average response size for a given ip address
@@ -129,6 +133,7 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
};
public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new AverageBytesByIP(), args);
+ int result = ToolRunner.run(new Configuration(), new AverageBytesByIP(), args);
+ System.exit(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
index 1953e3a..59b05fa 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
@@ -28,6 +28,7 @@ import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
@@ -46,7 +47,9 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
public int run(String[] args) throws Exception {
- if (args.length != 2) {
+ String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+
+ if (remainingArgs.length != 3) {
System.err.println();
System.err.println("Two and only two arguments are accepted.");
System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output");
@@ -57,7 +60,7 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf());
// Reference a given text file as a collection of Strings.
- PCollection<String> lines = pipeline.readTextFile(args[0]);
+ PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]);
// Combiner used for summing up response size
CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS();
@@ -67,10 +70,11 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
.parallelDo(extractIPResponseSize, Writables.tableOf(Writables.strings(), Writables.longs())).groupByKey()
.combineValues(longSumCombiner);
- pipeline.writeTextFile(ipAddrResponseSize, args[1]);
+ pipeline.writeTextFile(ipAddrResponseSize, remainingArgs[2]);
// Execute the pipeline as a MapReduce.
- pipeline.done();
- return 0;
+ PipelineResult result = pipeline.done();
+
+ return result.succeeded() ? 0 : 1;
}
// Function to parse apache log records
@@ -101,6 +105,7 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
};
public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new TotalBytesByIP(), args);
+ int result = ToolRunner.run(new Configuration(), new TotalBytesByIP(), args);
+ System.exit(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
index e4ce25b..31d99d3 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
@@ -24,6 +24,7 @@ import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,9 @@ import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool, Serializable {
public int run(String[] args) throws Exception {
- if (args.length != 3) {
+ String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+
+ if (remainingArgs.length != 3) {
System.err.println();
System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output");
System.err.println();
@@ -44,7 +47,7 @@ public class WordCount extends Configured implements Tool, Serializable {
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
// Reference a given text file as a collection of Strings.
- PCollection<String> lines = pipeline.readTextFile(args[1]);
+ PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]);
// Define a function that splits each line in a PCollection of Strings into
// a
@@ -64,13 +67,15 @@ public class WordCount extends Configured implements Tool, Serializable {
PTable<String, Long> counts = words.count();
// Instruct the pipeline to write the resulting counts to a text file.
- pipeline.writeTextFile(counts, args[2]);
+ pipeline.writeTextFile(counts, remainingArgs[2]);
// Execute the pipeline as a MapReduce.
- pipeline.done();
- return 0;
+ PipelineResult result = pipeline.done();
+
+ return result.succeeded() ? 0 : 1;
}
public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new WordCount(), args);
+ int result = ToolRunner.run(new Configuration(), new WordCount(), args);
+ System.exit(result);
}
}