You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/08 21:58:59 UTC
[2/4] incubator-beam git commit: Revise WordCount example to be
better cross-runner example
Revise WordCount example to be better cross-runner example
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b05a8c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b05a8c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b05a8c7
Branch: refs/heads/master
Commit: 0b05a8c7ff8e1f76516a6b13d504f776b5c9111e
Parents: c64cf36
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 14:19:47 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 8 13:51:25 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCount.java | 62 ++++++++------------
1 file changed, 23 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0b05a8c7/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index e7eab6e..5be0ddc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -17,15 +17,13 @@
*/
package org.apache.beam.examples;
-import com.google.common.base.Strings;
-import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
@@ -34,8 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -53,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection;
* </a>
*
* <p>Basic concepts, also in the MinimalWordCount example:
- * Reading text files; counting a PCollection; writing to GCS.
+ * Reading text files; counting a PCollection; writing to text files
*
* <p>New Concepts:
* <pre>
@@ -63,30 +59,31 @@ import org.apache.beam.sdk.values.PCollection;
* 4. Defining your own pipeline options
* </pre>
*
- * <p>Concept #1: you can execute this pipeline either locally or using the selected runner.
+ * <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner.
* These are now command-line options and not hard-coded as they were in the MinimalWordCount
* example.
- * To execute this pipeline locally, specify a local output file or output prefix on GCS:
- * <pre>{@code
- * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
*
* <p>To change the runner, specify:
* <pre>{@code
* --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
- * See examples/java/README.md for instructions about how to configure different runners.
*
- * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
- * and can be overridden with {@code --inputFile}.
+ * <p>To execute this pipeline, specify a local output file (if using the
+ * {@code DirectRunner}) or output prefix on a supported distributed file system.
+ * <pre>{@code
+ * --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
*/
public class WordCount {
/**
- * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
- * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
- * pipeline.
+ * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
+ * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it
+ * to a ParDo in the pipeline.
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
@@ -153,36 +150,23 @@ public class WordCount {
* <p>Inherits standard configuration options.
*/
public interface WordCountOptions extends PipelineOptions {
+
+ /**
+ * By default, this example reads from a public dataset containing the text of
+ * King Lear. Set this option to choose a different input file or glob.
+ */
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
+ /**
+ * Set this required option to specify where to write the output.
+ */
@Description("Path of the file to write to")
- @Default.InstanceFactory(OutputFactory.class)
+ @Required
String getOutput();
void setOutput(String value);
-
- /**
- * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination.
- */
- class OutputFactory implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- String tempLocation = options.getTempLocation();
- if (!Strings.isNullOrEmpty(tempLocation)) {
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- return factory.resolve(tempLocation, "counts.txt");
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve temp location: %s", tempLocation));
- }
- } else {
- throw new IllegalArgumentException("Must specify --output or --tempLocation");
- }
- }
- }
}
public static void main(String[] args) {