You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/01/28 16:29:16 UTC

[beam] branch master updated: [BEAM-6523] Add transcoding extension for Python flatten tests

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e22eb0  [BEAM-6523] Add transcoding extension for Python flatten tests
     new 3b9eab9  Merge pull request #7646: [BEAM-6523] Add transcoding extension for Python flatten tests
8e22eb0 is described below

commit 8e22eb02212168f1c250be4dc154f7233e3227e5
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Jan 28 14:31:42 2019 +0100

    [BEAM-6523] Add transcoding extension for Python flatten tests
    
    The Flink Runner does not support non-matching union coders yet. Though
    transcoding is supported when the SDK harness performs the flatten operation.
---
 .../runners/portability/flink_runner_test.py       |  6 ++++++
 .../runners/portability/fn_api_runner_test.py      | 22 ++++++++++++++++------
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index cc267f2..65845cd 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -162,6 +162,12 @@ if __name__ == '__main__':
     def test_error_traceback_includes_user_code(self):
       raise unittest.SkipTest("BEAM-6019")
 
+    def test_flattened_side_input(self):
+      # Blocked on support for transcoding
+      # https://jira.apache.org/jira/browse/BEAM-6523
+      super(FlinkRunnerTest, self).test_flattened_side_input(
+          with_transcoding=False)
+
     def test_metrics(self):
       """Run a simple DoFn that increments a counter, and verify that its
        expected value is written to a temporary file by the FileReporter"""
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 1e630be..cf37ec4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -188,12 +188,17 @@ class FnApiRunnerTest(unittest.TestCase):
               (9, list(range(7, 10)))]),
           label='windowed')
 
-  def test_flattened_side_input(self):
+  def test_flattened_side_input(self, with_transcoding=True):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create([None])
       side1 = p | 'side1' >> beam.Create([('a', 1)])
       side2 = p | 'side2' >> beam.Create([('b', 2)])
-      side3 = p | 'side3' >> beam.Create([('b', 3)])
+      if with_transcoding:
+        # Also test non-matching coder types (transcoding required)
+        third_element = [('another_type')]
+      else:
+        third_element = [('b', 3)]
+      side3 = p | 'side3' >> beam.Create(third_element)
       side = (side1, side2) | beam.Flatten()
       assert_that(
           main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
@@ -201,7 +206,7 @@ class FnApiRunnerTest(unittest.TestCase):
           label='CheckFlattenAsSideInput')
       assert_that(
           (side, side3) | 'FlattenAfter' >> beam.Flatten(),
-          equal_to([('a', 1), ('b', 2), ('b', 3)]),
+          equal_to([('a', 1), ('b', 2)] + third_element),
           label='CheckFlattenOfSideInput')
 
   def test_gbk_side_input(self):
@@ -360,12 +365,17 @@ class FnApiRunnerTest(unittest.TestCase):
       assert_that(p | beam.Create([1, 2, 3]) | beam.Reshuffle(),
                   equal_to([1, 2, 3]))
 
-  def test_flatten(self):
+  def test_flatten(self, with_transcoding=True):
     with self.create_pipeline() as p:
+      if with_transcoding:
+        # Additional element which does not match with the first type
+        additional = [ord('d')]
+      else:
+        additional = ['d']
       res = (p | 'a' >> beam.Create(['a']),
              p | 'bc' >> beam.Create(['b', 'c']),
-             p | 'd' >> beam.Create(['d'])) | beam.Flatten()
-      assert_that(res, equal_to(['a', 'b', 'c', 'd']))
+             p | 'd' >> beam.Create(additional)) | beam.Flatten()
+      assert_that(res, equal_to(['a', 'b', 'c'] + additional))
 
   def test_combine_per_key(self):
     with self.create_pipeline() as p: