You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/24 20:48:56 UTC

[beam] branch master updated: Remove usages of WriteStringsToPubSub in examples

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new feb2b84  Remove usages of WriteStringsToPubSub in examples
     new b0e3d36  Merge pull request #13615 from [BEAM-11524] Remove usages of WriteStringsToPubSub in examples
feb2b84 is described below

commit feb2b84c6448eaa2750bf3166c4c0d109ed8c69e
Author: Brian Hulette <bh...@google.com>
AuthorDate: Thu Dec 24 08:24:39 2020 -0800

    Remove usages of WriteStringsToPubSub in examples
---
 sdks/python/apache_beam/examples/snippets/snippets.py      |  6 +++---
 sdks/python/apache_beam/examples/snippets/snippets_test.py | 10 +++++-----
 sdks/python/apache_beam/examples/sql_taxi.py               |  3 ++-
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index a5d2b7b..42e5886 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -697,12 +697,12 @@ def examples_wordcount_streaming(argv):
         | 'Group' >> beam.GroupByKey()
         |
         'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
-        |
-        'Format' >> beam.Map(lambda word_and_count: '%s: %d' % word_and_count))
+        | 'Format' >>
+        beam.MapTuple(lambda word, count: f'{word}: {count}'.encode('utf-8')))
 
     # [START example_wordcount_streaming_write]
     # Write to Pub/Sub
-    output | beam.io.WriteStringsToPubSub(known_args.output_topic)
+    output | beam.io.WriteToPubSub(known_args.output_topic)
     # [END example_wordcount_streaming_write]
 
 
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 238ecc8..35c60e7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -748,7 +748,7 @@ class SnippetsTest(unittest.TestCase):
 
   @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
   @mock.patch('apache_beam.io.ReadFromPubSub')
-  @mock.patch('apache_beam.io.WriteStringsToPubSub')
+  @mock.patch('apache_beam.io.WriteToPubSub')
   def test_examples_wordcount_streaming(self, *unused_mocks):
     def FakeReadFromPubSub(topic=None, subscription=None, values=None):
       expected_topic = topic
@@ -768,7 +768,7 @@ class SnippetsTest(unittest.TestCase):
       def expand(self, pcoll):
         assert_that(pcoll, self.matcher)
 
-    def FakeWriteStringsToPubSub(topic=None, values=None):
+    def FakeWriteToPubSub(topic=None, values=None):
       expected_topic = topic
 
       def _inner(topic=None, subscription=None):
@@ -785,11 +785,11 @@ class SnippetsTest(unittest.TestCase):
         TimestampedValue(b'a b c c c', 20)
     ]
     output_topic = 'projects/fake-beam-test-project/topic/outtopic'
-    output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3']
+    output_values = [b'a: 1', b'a: 2', b'b: 1', b'b: 3', b'c: 3']
     beam.io.ReadFromPubSub = (
         FakeReadFromPubSub(topic=input_topic, values=input_values))
-    beam.io.WriteStringsToPubSub = (
-        FakeWriteStringsToPubSub(topic=output_topic, values=output_values))
+    beam.io.WriteToPubSub = (
+        FakeWriteToPubSub(topic=output_topic, values=output_values))
     snippets.examples_wordcount_streaming([
         '--input_topic',
         'projects/fake-beam-test-project/topic/intopic',
diff --git a/sdks/python/apache_beam/examples/sql_taxi.py b/sdks/python/apache_beam/examples/sql_taxi.py
index 607dea1..32fd80a 100644
--- a/sdks/python/apache_beam/examples/sql_taxi.py
+++ b/sdks/python/apache_beam/examples/sql_taxi.py
@@ -80,7 +80,8 @@ def run(output_topic, pipeline_args):
                 "window_end": window.end.to_rfc3339()
             })
         | "Convert to JSON" >> beam.Map(json.dumps)
-        | beam.io.WriteStringsToPubSub(topic=output_topic))
+        | "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8"))
+        | beam.io.WriteToPubSub(topic=output_topic))
 
 
 if __name__ == '__main__':