You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/16 21:10:57 UTC
[beam] branch master updated: Enable input subscription in Python
streaming wordcount (#4864)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a7bc72d Enable input subscription in Python streaming wordcount (#4864)
a7bc72d is described below
commit a7bc72d4a3493d59d88923a99762031a1d81b07d
Author: Mark Liu <ma...@users.noreply.github.com>
AuthorDate: Fri Mar 16 14:10:54 2018 -0700
Enable input subscription in Python streaming wordcount (#4864)
* Enable input subscription in Python streaming wordcount
---
.../apache_beam/examples/streaming_wordcount.py | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 43d19f3..12f7351 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -43,13 +43,18 @@ def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
- '--input_topic', required=True,
- help=('Input PubSub topic of the form '
- '"projects/<PROJECT>/topics/<TOPIC>".'))
- parser.add_argument(
'--output_topic', required=True,
help=('Output PubSub topic of the form '
'"projects/<PROJECT>/topic/<TOPIC>".'))
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument(
+ '--input_topic',
+ help=('Input PubSub topic of the form '
+ '"projects/<PROJECT>/topics/<TOPIC>".'))
+ group.add_argument(
+ '--input_subscription',
+ help=('Input PubSub subscription of the form '
+ '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
known_args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(StandardOptions).streaming = True
@@ -57,7 +62,11 @@ def run(argv=None):
with beam.Pipeline(options=options) as p:
# Read from PubSub into a PCollection.
- lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
+ if known_args.input_subscription:
+ lines = p | beam.io.ReadStringsFromPubSub(
+ subscription=known_args.input_subscription)
+ else:
+ lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
# Capitalize the characters in each line.
def count_ones(word_ones):
--
To stop receiving notification emails like this one, please contact
altay@apache.org.