You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/02 00:21:42 UTC

[2/2] incubator-beam git commit: Replace PubsubIO and injector with TextIO in beam-examples

Replace PubsubIO and injector with TextIO in beam-examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/40d8072c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/40d8072c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/40d8072c

Branch: refs/heads/master
Commit: 40d8072c2d03dd80a7e644cb1587c765b1477d5a
Parents: 85c36b8
Author: Pei He <pe...@google.com>
Authored: Wed Jun 29 13:01:10 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 1 17:21:37 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/StreamingWordExtract.java | 35 ++++++--------------
 1 file changed, 10 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/40d8072c/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index aca4222..4ea199c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -19,11 +19,10 @@ package org.apache.beam.examples.complete;
 
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
-import org.apache.beam.examples.common.ExamplePubsubTopicOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -40,22 +39,14 @@ import java.util.ArrayList;
 /**
  * A streaming Dataflow Example using BigQuery output.
  *
- * <p>This pipeline example reads lines of text from a PubSub topic, splits each line
+ * <p>This pipeline example reads lines of the input text file, splits each line
  * into individual words, capitalizes those words, and writes the output to
  * a BigQuery table.
  *
- * <p>By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
+ * <p>The example is configured to use the default BigQuery table from the example common package
+ * (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --bigQueryDataset}, and {@literal --bigQueryTable}
+ * options. If the BigQuery table do not exist, the example will try to create them.
  *
  * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
  * and then exits.
@@ -110,9 +101,8 @@ public class StreamingWordExtract {
    *
    * <p>Inherits standard configuration options.
    */
-  private interface StreamingWordExtractOptions
-      extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
-    @Description("Input file to inject to Pub/Sub topic")
+  private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions {
+    @Description("Path of the file to read from")
     @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
     String getInputFile();
     void setInputFile(String value);
@@ -141,7 +131,7 @@ public class StreamingWordExtract {
         .append(options.getBigQueryTable())
         .toString();
     pipeline
-        .apply(PubsubIO.Read.topic(options.getPubsubTopic()))
+        .apply("ReadLines", TextIO.Read.from(options.getInputFile()))
         .apply(ParDo.of(new ExtractWords()))
         .apply(ParDo.of(new Uppercase()))
         .apply(ParDo.of(new StringToRowConverter()))
@@ -150,12 +140,7 @@ public class StreamingWordExtract {
 
     PipelineResult result = pipeline.run();
 
-    if (!options.getInputFile().isEmpty()) {
-      // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
-      dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
-    }
-
-    // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+    // dataflowUtils will try to cancel the pipeline before the program exists.
     dataflowUtils.waitToFinish(result);
   }
 }