You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/16 21:10:57 UTC

[beam] branch master updated: Enable input subscription in Python streaming wordcount (#4864)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a7bc72d  Enable input subscription in Python streaming wordcount (#4864)
a7bc72d is described below

commit a7bc72d4a3493d59d88923a99762031a1d81b07d
Author: Mark Liu <ma...@users.noreply.github.com>
AuthorDate: Fri Mar 16 14:10:54 2018 -0700

    Enable input subscription in Python streaming wordcount (#4864)
    
    * Enable input subscription in Python streaming wordcount
---
 .../apache_beam/examples/streaming_wordcount.py       | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 43d19f3..12f7351 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -43,13 +43,18 @@ def run(argv=None):
   """Build and run the pipeline."""
   parser = argparse.ArgumentParser()
   parser.add_argument(
-      '--input_topic', required=True,
-      help=('Input PubSub topic of the form '
-            '"projects/<PROJECT>/topics/<TOPIC>".'))
-  parser.add_argument(
       '--output_topic', required=True,
       help=('Output PubSub topic of the form '
             '"projects/<PROJECT>/topic/<TOPIC>".'))
+  group = parser.add_mutually_exclusive_group(required=True)
+  group.add_argument(
+      '--input_topic',
+      help=('Input PubSub topic of the form '
+            '"projects/<PROJECT>/topics/<TOPIC>".'))
+  group.add_argument(
+      '--input_subscription',
+      help=('Input PubSub subscription of the form '
+            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
   known_args, pipeline_args = parser.parse_known_args(argv)
   options = PipelineOptions(pipeline_args)
   options.view_as(StandardOptions).streaming = True
@@ -57,7 +62,11 @@ def run(argv=None):
   with beam.Pipeline(options=options) as p:
 
     # Read from PubSub into a PCollection.
-    lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
+    if known_args.input_subscription:
+      lines = p | beam.io.ReadStringsFromPubSub(
+          subscription=known_args.input_subscription)
+    else:
+      lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
 
     # Capitalize the characters in each line.
     def count_ones(word_ones):

-- 
To stop receiving notification emails like this one, please contact
altay@apache.org.