You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/08 21:58:59 UTC

[2/4] incubator-beam git commit: Revise WordCount example to be better cross-runner example

Revise WordCount example to be better cross-runner example


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b05a8c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b05a8c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b05a8c7

Branch: refs/heads/master
Commit: 0b05a8c7ff8e1f76516a6b13d504f776b5c9111e
Parents: c64cf36
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 14:19:47 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 8 13:51:25 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCount.java     | 62 ++++++++------------
 1 file changed, 23 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0b05a8c7/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index e7eab6e..5be0ddc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -17,15 +17,13 @@
  */
 package org.apache.beam.examples;
 
-import com.google.common.base.Strings;
-import java.io.IOException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -34,8 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -53,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection;
  *   </a>
  *
  * <p>Basic concepts, also in the MinimalWordCount example:
- * Reading text files; counting a PCollection; writing to GCS.
+ * Reading text files; counting a PCollection; writing to text files
  *
  * <p>New Concepts:
  * <pre>
@@ -63,30 +59,31 @@ import org.apache.beam.sdk.values.PCollection;
  *   4. Defining your own pipeline options
  * </pre>
  *
- * <p>Concept #1: you can execute this pipeline either locally or using the selected runner.
+ * <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner.
  * These are now command-line options and not hard-coded as they were in the MinimalWordCount
  * example.
- * To execute this pipeline locally, specify a local output file or output prefix on GCS:
- * <pre>{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
  *
  * <p>To change the runner, specify:
  * <pre>{@code
  *   --runner=YOUR_SELECTED_RUNNER
  * }
  * </pre>
- * See examples/java/README.md for instructions about how to configure different runners.
  *
- * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
- * and can be overridden with {@code --inputFile}.
+ * <p>To execute this pipeline, specify a local output file (if using the
+ * {@code DirectRunner}) or output prefix on a supported distributed file system.
+ * <pre>{@code
+ *   --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
  */
 public class WordCount {
 
   /**
-   * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
-   * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
-   * pipeline.
+   * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
+   * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it
+   * to a ParDo in the pipeline.
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
@@ -153,36 +150,23 @@ public class WordCount {
    * <p>Inherits standard configuration options.
    */
   public interface WordCountOptions extends PipelineOptions {
+
+    /**
+     * By default, this example reads from a public dataset containing the text of
+     * King Lear. Set this option to choose a different input file or glob.
+     */
     @Description("Path of the file to read from")
     @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
     String getInputFile();
     void setInputFile(String value);
 
+    /**
+     * Set this required option to specify where to write the output.
+     */
     @Description("Path of the file to write to")
-    @Default.InstanceFactory(OutputFactory.class)
+    @Required
     String getOutput();
     void setOutput(String value);
-
-    /**
-     * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination.
-     */
-    class OutputFactory implements DefaultValueFactory<String> {
-      @Override
-      public String create(PipelineOptions options) {
-        String tempLocation = options.getTempLocation();
-        if (!Strings.isNullOrEmpty(tempLocation)) {
-          try {
-            IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-            return factory.resolve(tempLocation, "counts.txt");
-          } catch (IOException e) {
-            throw new RuntimeException(
-                String.format("Failed to resolve temp location: %s", tempLocation));
-          }
-        } else {
-          throw new IllegalArgumentException("Must specify --output or --tempLocation");
-        }
-      }
-    }
   }
 
   public static void main(String[] args) {