You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/24 20:48:56 UTC
[beam] branch master updated: Remove usages of WriteStringsToPubSub
in examples
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new feb2b84 Remove usages of WriteStringsToPubSub in examples
new b0e3d36 Merge pull request #13615 from [BEAM-11524] Remove usages of WriteStringsToPubSub in examples
feb2b84 is described below
commit feb2b84c6448eaa2750bf3166c4c0d109ed8c69e
Author: Brian Hulette <bh...@google.com>
AuthorDate: Thu Dec 24 08:24:39 2020 -0800
Remove usages of WriteStringsToPubSub in examples
---
sdks/python/apache_beam/examples/snippets/snippets.py | 6 +++---
sdks/python/apache_beam/examples/snippets/snippets_test.py | 10 +++++-----
sdks/python/apache_beam/examples/sql_taxi.py | 3 ++-
3 files changed, 10 insertions(+), 9 deletions(-)
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index a5d2b7b..42e5886 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -697,12 +697,12 @@ def examples_wordcount_streaming(argv):
| 'Group' >> beam.GroupByKey()
|
'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
- |
- 'Format' >> beam.Map(lambda word_and_count: '%s: %d' % word_and_count))
+ | 'Format' >>
+ beam.MapTuple(lambda word, count: f'{word}: {count}'.encode('utf-8')))
# [START example_wordcount_streaming_write]
# Write to Pub/Sub
- output | beam.io.WriteStringsToPubSub(known_args.output_topic)
+ output | beam.io.WriteToPubSub(known_args.output_topic)
# [END example_wordcount_streaming_write]
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 238ecc8..35c60e7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -748,7 +748,7 @@ class SnippetsTest(unittest.TestCase):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('apache_beam.io.ReadFromPubSub')
- @mock.patch('apache_beam.io.WriteStringsToPubSub')
+ @mock.patch('apache_beam.io.WriteToPubSub')
def test_examples_wordcount_streaming(self, *unused_mocks):
def FakeReadFromPubSub(topic=None, subscription=None, values=None):
expected_topic = topic
@@ -768,7 +768,7 @@ class SnippetsTest(unittest.TestCase):
def expand(self, pcoll):
assert_that(pcoll, self.matcher)
- def FakeWriteStringsToPubSub(topic=None, values=None):
+ def FakeWriteToPubSub(topic=None, values=None):
expected_topic = topic
def _inner(topic=None, subscription=None):
@@ -785,11 +785,11 @@ class SnippetsTest(unittest.TestCase):
TimestampedValue(b'a b c c c', 20)
]
output_topic = 'projects/fake-beam-test-project/topic/outtopic'
- output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3']
+ output_values = [b'a: 1', b'a: 2', b'b: 1', b'b: 3', b'c: 3']
beam.io.ReadFromPubSub = (
FakeReadFromPubSub(topic=input_topic, values=input_values))
- beam.io.WriteStringsToPubSub = (
- FakeWriteStringsToPubSub(topic=output_topic, values=output_values))
+ beam.io.WriteToPubSub = (
+ FakeWriteToPubSub(topic=output_topic, values=output_values))
snippets.examples_wordcount_streaming([
'--input_topic',
'projects/fake-beam-test-project/topic/intopic',
diff --git a/sdks/python/apache_beam/examples/sql_taxi.py b/sdks/python/apache_beam/examples/sql_taxi.py
index 607dea1..32fd80a 100644
--- a/sdks/python/apache_beam/examples/sql_taxi.py
+++ b/sdks/python/apache_beam/examples/sql_taxi.py
@@ -80,7 +80,8 @@ def run(output_topic, pipeline_args):
"window_end": window.end.to_rfc3339()
})
| "Convert to JSON" >> beam.Map(json.dumps)
- | beam.io.WriteStringsToPubSub(topic=output_topic))
+ | "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8"))
+ | beam.io.WriteToPubSub(topic=output_topic))
if __name__ == '__main__':