You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ma...@apache.org on 2012/09/20 07:51:57 UTC

git commit: CRUNCH-68: Fix command line parser for examples.

Updated Branches:
  refs/heads/master a65feb569 -> 17ec15584


CRUNCH-68: Fix command line parser for examples.

Use GenericOptionsParser to deal with Hadoop options.
Make examples exit with appropriate status codes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/17ec1558
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/17ec1558
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/17ec1558

Branch: refs/heads/master
Commit: 17ec1558455f2ba7ab099da5f028b4b6b01adda0
Parents: a65feb5
Author: Matthias Friedrich <ma...@mafr.de>
Authored: Wed Sep 19 20:22:27 2012 +0200
Committer: Matthias Friedrich <ma...@mafr.de>
Committed: Thu Sep 20 07:29:50 2012 +0200

----------------------------------------------------------------------
 .../apache/crunch/examples/AverageBytesByIP.java   |   17 +++++++++-----
 .../org/apache/crunch/examples/TotalBytesByIP.java |   17 +++++++++-----
 .../java/org/apache/crunch/examples/WordCount.java |   17 +++++++++-----
 3 files changed, 33 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
index 868e38a..52b542a 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
@@ -29,6 +29,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -47,7 +48,9 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
   static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
 
   public int run(String[] args) throws Exception {
-    if (args.length != 2) {
+    String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+
+    if (remainingArgs.length != 3) {
       System.err.println();
       System.err.println("Two and only two arguments are accepted.");
       System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output");
@@ -58,7 +61,7 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
     // Create an object to coordinate pipeline creation and execution.
     Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf());
     // Reference a given text file as a collection of Strings.
-    PCollection<String> lines = pipeline.readTextFile(args[0]);
+    PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]);
 
     // Combiner used for summing up response size and count
     CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = CombineFn.pairAggregator(CombineFn.SUM_LONGS,
@@ -75,10 +78,11 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
         Writables.tableOf(Writables.strings(), Writables.doubles()));
 
     // write the result to a text file
-    pipeline.writeTextFile(avgs, args[1]);
+    pipeline.writeTextFile(avgs, remainingArgs[2]);
     // Execute the pipeline as a MapReduce.
-    pipeline.done();
-    return 0;
+    PipelineResult result = pipeline.done();
+
+    return result.succeeded() ? 0 : 1;
   }
 
   // Function to calculate the average response size for a given ip address
@@ -129,6 +133,7 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
   };
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new AverageBytesByIP(), args);
+    int result = ToolRunner.run(new Configuration(), new AverageBytesByIP(), args);
+    System.exit(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
index 1953e3a..59b05fa 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
@@ -28,6 +28,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +47,9 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
   static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
 
   public int run(String[] args) throws Exception {
-    if (args.length != 2) {
+    String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+
+    if (remainingArgs.length != 3) {
       System.err.println();
       System.err.println("Two and only two arguments are accepted.");
       System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output");
@@ -57,7 +60,7 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
     // Create an object to coordinate pipeline creation and execution.
     Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf());
     // Reference a given text file as a collection of Strings.
-    PCollection<String> lines = pipeline.readTextFile(args[0]);
+    PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]);
 
     // Combiner used for summing up response size
     CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS();
@@ -67,10 +70,11 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
         .parallelDo(extractIPResponseSize, Writables.tableOf(Writables.strings(), Writables.longs())).groupByKey()
         .combineValues(longSumCombiner);
 
-    pipeline.writeTextFile(ipAddrResponseSize, args[1]);
+    pipeline.writeTextFile(ipAddrResponseSize, remainingArgs[2]);
     // Execute the pipeline as a MapReduce.
-    pipeline.done();
-    return 0;
+    PipelineResult result = pipeline.done();
+
+    return result.succeeded() ? 0 : 1;
   }
 
   // Function to parse apache log records
@@ -101,6 +105,7 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
   };
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new TotalBytesByIP(), args);
+    int result = ToolRunner.run(new Configuration(), new TotalBytesByIP(), args);
+    System.exit(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
index e4ce25b..31d99d3 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java
@@ -24,6 +24,7 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,9 @@ import org.apache.hadoop.util.ToolRunner;
 
 public class WordCount extends Configured implements Tool, Serializable {
   public int run(String[] args) throws Exception {
-    if (args.length != 3) {
+    String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+
+    if (remainingArgs.length != 3) {
       System.err.println();
       System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output");
       System.err.println();
@@ -44,7 +47,7 @@ public class WordCount extends Configured implements Tool, Serializable {
     // Create an object to coordinate pipeline creation and execution.
     Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
     // Reference a given text file as a collection of Strings.
-    PCollection<String> lines = pipeline.readTextFile(args[1]);
+    PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]);
 
     // Define a function that splits each line in a PCollection of Strings into
     // a
@@ -64,13 +67,15 @@ public class WordCount extends Configured implements Tool, Serializable {
     PTable<String, Long> counts = words.count();
 
     // Instruct the pipeline to write the resulting counts to a text file.
-    pipeline.writeTextFile(counts, args[2]);
+    pipeline.writeTextFile(counts, remainingArgs[2]);
     // Execute the pipeline as a MapReduce.
-    pipeline.done();
-    return 0;
+    PipelineResult result = pipeline.done();
+
+    return result.succeeded() ? 0 : 1;
   }
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new WordCount(), args);
+    int result = ToolRunner.run(new Configuration(), new WordCount(), args);
+    System.exit(result);
   }
 }