You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/10 22:07:05 UTC

[2/4] incubator-beam git commit: Update examples archetype to match examples

Update examples archetype to match examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e132ee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e132ee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e132ee8

Branch: refs/heads/master
Commit: 1e132ee83d5f393498c12003a328e51d0e93bd06
Parents: 9f78c44
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 10 12:06:42 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 10 14:06:39 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/DebuggingWordCount.java       | 69 +++++---------------
 .../src/main/java/MinimalWordCount.java         | 52 +++++++--------
 .../src/main/java/WindowedWordCount.java        |  6 +-
 .../src/main/java/WordCount.java                | 64 +++++++-----------
 .../common/ExampleBigQueryTableOptions.java     |  2 +-
 .../src/main/java/common/ExampleOptions.java    |  5 ++
 ...xamplePubsubTopicAndSubscriptionOptions.java |  2 +-
 .../java/common/ExamplePubsubTopicOptions.java  |  2 +-
 .../src/main/java/common/ExampleUtils.java      |  3 +-
 .../src/test/java/DebuggingWordCountTest.java   | 15 ++++-
 .../src/test/java/WordCountTest.java            |  7 +-
 11 files changed, 91 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index 9727379..99ae796 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -50,10 +50,9 @@ import org.slf4j.LoggerFactory;
  *
  * <p>New Concepts:
  * <pre>
- *   1. Logging to Cloud Logging
- *   2. Controlling worker log levels
- *   3. Creating a custom aggregator
- *   4. Testing your Pipeline via PAssert
+ *   1. Logging using SLF4J, even in a distributed environment
+ *   2. Creating a custom aggregator (runners have varying levels of support)
+ *   3. Testing your Pipeline via PAssert
  * </pre>
  *
  * <p>To execute this pipeline locally, specify general pipeline configuration:
@@ -68,51 +67,20 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  *
- * <p>To use the additional logging discussed below, specify:
- * <pre>{@code
- *   --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
- * }
- * </pre>
- *
- * <p>Note that when you run via <code>mvn exec</code>, you may need to escape
- * the quotations as appropriate for your shell. For example, in <code>bash</code>:
- * <pre>
- * mvn compile exec:java ... \
- *   -Dexec.args="... \
- *     --workerLogLevelOverrides={\\\"org.apache.beam.examples\\\":\\\"DEBUG\\\"}"
- * </pre>
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
  *
- * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud
- * Logging by default at "INFO" log level and higher. One may override log levels for specific
- * logging namespaces by specifying:
- * <pre><code>
- *   --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
- * </code></pre>
- * For example, by specifying:
- * <pre><code>
- *   --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
- * </code></pre>
- * when executing this pipeline using the Dataflow service, Cloud Logging would contain only
- * "DEBUG" or higher level logs for the {@code org.apache.beam.examples} package in
- * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker
- * logging configuration can be overridden by specifying
- * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example,
- * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with
- * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note
- * that changing the default worker log level to TRACE or DEBUG will significantly increase
- * the amount of logs output.
- *
- * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
- * and can be overridden with {@code --inputFile}.
  */
 public class DebuggingWordCount {
   /** A DoFn that filters for a specific key based upon a regular expression. */
   public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
     /**
-     * Concept #1: The logger below uses the fully qualified class name of FilterTextFn
-     * as the logger. All log statements emitted by this logger will be referenced by this name
-     * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging
-     * about the Cloud Logging UI.
+     * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the
+     * logger. Depending on your SLF4J configuration, log statements will likely be qualified by
+     * this name.
+     *
+     * <p>Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J
+     * configuration that is most appropriate for their logging integration.
      */
     private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
 
@@ -122,11 +90,9 @@ public class DebuggingWordCount {
     }
 
     /**
-     * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those
-     * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the
-     * Dataflow service. These aggregators below track the number of matched and unmatched words.
-     * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
-     * the Dataflow Monitoring UI.
+     * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
+     * runner provides varying levels of support for aggregators, and may expose them
+     * in a dashboard, etc.
      */
     private final Aggregator<Long, Long> matchedWords =
         createAggregator("matchedWords", new Sum.SumLongFn());
@@ -137,8 +103,7 @@ public class DebuggingWordCount {
     public void processElement(ProcessContext c) {
       if (filter.matcher(c.element().getKey()).matches()) {
         // Log at the "DEBUG" level each element that we match. When executing this pipeline
-        // using the Dataflow service, these log lines will appear in the Cloud Logging UI
-        // only if the log level is set to "DEBUG" or lower.
+        // these log lines will appear only if the log level is set to "DEBUG" or lower.
         LOG.debug("Matched: " + c.element().getKey());
         matchedWords.addValue(1L);
         c.output(c.element());
@@ -178,7 +143,7 @@ public class DebuggingWordCount {
          .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
 
     /**
-     * Concept #4: PAssert is a set of convenient PTransforms in the style of
+     * Concept #3: PAssert is a set of convenient PTransforms in the style of
      * Hamcrest's collection matchers that can be used when writing Pipeline level tests
      * to validate the contents of PCollections. PAssert is best used in unit tests
      * with small data sets but is demonstrated here as a teaching tool.
@@ -194,6 +159,6 @@ public class DebuggingWordCount {
         KV.of("stomach", 1L));
     PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
 
-    p.run();
+    p.run().waitUntilFinish();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index e8497c0..97bd824 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -37,46 +37,33 @@ import org.apache.beam.sdk.values.KV;
  * argument processing, and focus on construction of the pipeline, which chains together the
  * application of core transforms.
  *
- * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally
- * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
+ * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the
+ * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
  * concepts.
  *
  * <p>Concepts:
+ *
  * <pre>
  *   1. Reading data from text files
  *   2. Specifying 'inline' transforms
- *   3. Counting a PCollection
- *   4. Writing data to Cloud Storage as text files
+ *   3. Counting items in a PCollection
+ *   4. Writing data to text files
  * </pre>
  *
- * <p>To execute this pipeline, first edit the code to set your project ID, the temp
- * location, and the output location. The specified GCS bucket(s) must already exist.
- *
- * <p>Then, run the pipeline as described in the README. It will be deployed and run with the
- * selected runner. No args are required to run the pipeline. You can see the results in your
- * output bucket in the GCS browser.
+ * <p>No arguments are required to run this pipeline. It will be executed with the DirectRunner. You
+ * can see the results in the output files in your current working directory, with names like
+ * "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate
+ * file service.
  */
 public class MinimalWordCount {
 
   public static void main(String[] args) {
     // Create a PipelineOptions object. This object lets us set various execution
-    // options for our pipeline, such as the associated Cloud Platform project and the location
-    // in Google Cloud Storage to stage files.
+    // options for our pipeline, such as the runner you wish to use. This example
+    // will run with the DirectRunner by default, based on the class path configured
+    // in its dependencies.
     PipelineOptions options = PipelineOptionsFactory.create();
 
-    // In order to run your pipeline, you need to make following runner specific changes:
-    //
-    // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner.
-    // CHANGE 2/3: Specify runner-required options.
-    // For DataflowRunner, set project and temp location as follows:
-    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-    //   dataflowOptions.setRunner(DataflowRunner.class);
-    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
-    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
-    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
-    // for more details.
-    //   options.setRunner(FlinkRunner.class);
-
     // Create the Pipeline object with the options we defined above.
     Pipeline p = Pipeline.create(options);
 
@@ -85,7 +72,10 @@ public class MinimalWordCount {
     // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
     // of input text files. TextIO.Read returns a PCollection where each element is one line from
     // the input text (a set of Shakespeare's texts).
+
+    // This example reads a public data set consisting of the complete works of Shakespeare.
     p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+
      // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
      // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
      // The ParDo returns a PCollection<String>, where each element is an individual word in
@@ -100,10 +90,12 @@ public class MinimalWordCount {
                          }
                        }
                      }))
+
      // Concept #3: Apply the Count transform to our PCollection of individual words. The Count
      // transform returns a new PCollection of key/value pairs, where each key represents a unique
      // word in the text. The associated value is the occurrence count for that word.
      .apply(Count.<String>perElement())
+
      // Apply a MapElements transform that formats our PCollection of word counts into a printable
      // string, suitable for writing to an output file.
      .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@@ -112,13 +104,15 @@ public class MinimalWordCount {
                          return input.getKey() + ": " + input.getValue();
                        }
                      }))
+
      // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
      // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
-     // formatted strings) to a series of text files in Google Cloud Storage.
-     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+     // formatted strings) to a series of text files.
+     //
+     // By default, it will write to a set of files with names like wordcount-00001-of-00005
+     .apply(TextIO.Write.to("wordcounts"));
 
     // Run the pipeline.
-    p.run();
+    p.run().waitUntilFinish();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
index c92a37c..2812531 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
@@ -17,9 +17,6 @@
  */
 package ${package};
 
-import ${package}.common.ExampleBigQueryTableOptions;
-import ${package}.common.ExampleOptions;
-import ${package}.common.ExampleUtils;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -27,6 +24,9 @@ import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import ${package}.common.ExampleBigQueryTableOptions;
+import ${package}.common.ExampleOptions;
+import ${package}.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 80bfd3a..8fe7137 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -17,15 +17,13 @@
  */
 package ${package};
 
-import com.google.common.base.Strings;
-import java.io.IOException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -34,8 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -53,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection;
  *   </a>
  *
  * <p>Basic concepts, also in the MinimalWordCount example:
- * Reading text files; counting a PCollection; writing to GCS.
+ * Reading text files; counting a PCollection; writing to text files
  *
  * <p>New Concepts:
  * <pre>
@@ -63,30 +59,31 @@ import org.apache.beam.sdk.values.PCollection;
  *   4. Defining your own pipeline options
  * </pre>
  *
- * <p>Concept #1: you can execute this pipeline either locally or using the selected runner.
+ * <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner.
  * These are now command-line options and not hard-coded as they were in the MinimalWordCount
  * example.
- * To execute this pipeline locally, specify a local output file or output prefix on GCS:
- * <pre>{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
  *
  * <p>To change the runner, specify:
  * <pre>{@code
  *   --runner=YOUR_SELECTED_RUNNER
  * }
  * </pre>
- * See examples/java/README.md for instructions about how to configure different runners.
  *
- * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
- * and can be overridden with {@code --inputFile}.
+ * <p>To execute this pipeline, specify a local output file (if using the
+ * {@code DirectRunner}) or output prefix on a supported distributed file system.
+ * <pre>{@code
+ *   --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>The input file defaults to a public data set containing the text of of King Lear,
+ * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
  */
 public class WordCount {
 
   /**
-   * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
-   * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
-   * pipeline.
+   * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
+   * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it
+   * to a ParDo in the pipeline.
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
@@ -153,36 +150,23 @@ public class WordCount {
    * <p>Inherits standard configuration options.
    */
   public interface WordCountOptions extends PipelineOptions {
+
+    /**
+     * By default, this example reads from a public dataset containing the text of
+     * King Lear. Set this option to choose a different input file or glob.
+     */
     @Description("Path of the file to read from")
     @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
     String getInputFile();
     void setInputFile(String value);
 
+    /**
+     * Set this required option to specify where to write the output.
+     */
     @Description("Path of the file to write to")
-    @Default.InstanceFactory(OutputFactory.class)
+    @Required
     String getOutput();
     void setOutput(String value);
-
-    /**
-     * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination.
-     */
-    public static class OutputFactory implements DefaultValueFactory<String> {
-      @Override
-      public String create(PipelineOptions options) {
-        String tempLocation = options.getTempLocation();
-        if (!Strings.isNullOrEmpty(tempLocation)) {
-          try {
-            IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-            return factory.resolve(tempLocation, "counts.txt");
-          } catch (IOException e) {
-            throw new RuntimeException(
-                String.format("Failed to resolve temp location: %s", tempLocation));
-          }
-        } else {
-          throw new IllegalArgumentException("Must specify --output or --tempLocation");
-        }
-      }
-    }
   }
 
   public static void main(String[] args) {
@@ -197,6 +181,6 @@ public class WordCount {
      .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
-    p.run();
+    p.run().waitUntilFinish();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
index 96e8406..6b51074 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
@@ -46,7 +46,7 @@ public interface ExampleBigQueryTableOptions extends GcpOptions {
   /**
    * Returns the job name as the default BigQuery table name.
    */
-  static class BigQueryTableFactory implements DefaultValueFactory<String> {
+  class BigQueryTableFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
       return options.getJobName().replace('-', '_');

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
index 221e266..90f935c 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
@@ -29,4 +29,9 @@ public interface ExampleOptions extends PipelineOptions {
   @Default.Boolean(false)
   boolean getKeepJobsRunning();
   void setKeepJobsRunning(boolean keepJobsRunning);
+
+  @Description("Number of workers to use when executing the injector pipeline")
+  @Default.Integer(1)
+  int getInjectorNumWorkers();
+  void setInjectorNumWorkers(int numWorkers);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
index e3fb132..daeb398 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -35,7 +35,7 @@ public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubT
   /**
    * Returns a default Pub/Sub subscription based on the project and the job names.
    */
-  static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
+  class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
       return "projects/" + options.as(GcpOptions.class).getProject()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
index 1825267..936bff5 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
@@ -35,7 +35,7 @@ public interface ExamplePubsubTopicOptions extends GcpOptions {
   /**
    * Returns a default Pub/Sub topic based on the project and the job names.
    */
-  static class PubsubTopicFactory implements DefaultValueFactory<String> {
+  class PubsubTopicFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
       return "projects/" + options.as(GcpOptions.class).getProject()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
index c1b6489..570b382 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
@@ -272,8 +272,7 @@ public class ExampleUtils {
   }
 
   /**
-   * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
-   * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
+   * Waits for the pipeline to finish and cancels it before the program exists.
    */
   public void waitToFinish(PipelineResult result) {
     pipelinesToCancel.add(result);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java
index dfa1a75..155242d 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java
@@ -20,6 +20,8 @@ package ${package};
 import com.google.common.io.Files;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import ${package}.DebuggingWordCount.WordCountOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -35,9 +37,16 @@ public class DebuggingWordCountTest {
 
   @Test
   public void testDebuggingWordCount() throws Exception {
-    File file = tmpFolder.newFile();
-    Files.write("stomach secret Flourish message Flourish here Flourish", file,
+    File inputFile = tmpFolder.newFile();
+    File outputFile = tmpFolder.newFile();
+    Files.write(
+        "stomach secret Flourish message Flourish here Flourish",
+        inputFile,
         StandardCharsets.UTF_8);
-    DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()});
+    WordCountOptions options =
+        TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
+    options.setInputFile(inputFile.getAbsolutePath());
+    options.setOutput(outputFile.getAbsolutePath());
+    DebuggingWordCount.main(TestPipeline.convertToArgs(options));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
index 83d0f37..e86c2aa 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
@@ -17,12 +17,11 @@
  */
 package ${package};
 
+import java.util.Arrays;
+import java.util.List;
 import ${package}.WordCount.CountWords;
 import ${package}.WordCount.ExtractWordsFn;
 import ${package}.WordCount.FormatAsTextFn;
-
-import java.util.Arrays;
-import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
@@ -81,6 +80,6 @@ public class WordCountTest {
       .apply(MapElements.via(new FormatAsTextFn()));
 
     PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
-    p.run();
+    p.run().waitUntilFinish();
   }
 }