You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/01/28 16:29:16 UTC
[beam] branch master updated: [BEAM-6523] Add transcoding extension
for Python flatten tests
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8e22eb0 [BEAM-6523] Add transcoding extension for Python flatten tests
new 3b9eab9 Merge pull request #7646: [BEAM-6523] Add transcoding extension for Python flatten tests
8e22eb0 is described below
commit 8e22eb02212168f1c250be4dc154f7233e3227e5
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Jan 28 14:31:42 2019 +0100
[BEAM-6523] Add transcoding extension for Python flatten tests
The Flink Runner does not support non-matching union coders yet. Though
transcoding is supported when the SDK harness performs the flatten operation.
---
.../runners/portability/flink_runner_test.py | 6 ++++++
.../runners/portability/fn_api_runner_test.py | 22 ++++++++++++++++------
2 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index cc267f2..65845cd 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -162,6 +162,12 @@ if __name__ == '__main__':
def test_error_traceback_includes_user_code(self):
raise unittest.SkipTest("BEAM-6019")
+ def test_flattened_side_input(self):
+ # Blocked on support for transcoding
+ # https://jira.apache.org/jira/browse/BEAM-6523
+ super(FlinkRunnerTest, self).test_flattened_side_input(
+ with_transcoding=False)
+
def test_metrics(self):
"""Run a simple DoFn that increments a counter, and verify that its
expected value is written to a temporary file by the FileReporter"""
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 1e630be..cf37ec4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -188,12 +188,17 @@ class FnApiRunnerTest(unittest.TestCase):
(9, list(range(7, 10)))]),
label='windowed')
- def test_flattened_side_input(self):
+ def test_flattened_side_input(self, with_transcoding=True):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create([None])
side1 = p | 'side1' >> beam.Create([('a', 1)])
side2 = p | 'side2' >> beam.Create([('b', 2)])
- side3 = p | 'side3' >> beam.Create([('b', 3)])
+ if with_transcoding:
+ # Also test non-matching coder types (transcoding required)
+ third_element = [('another_type')]
+ else:
+ third_element = [('b', 3)]
+ side3 = p | 'side3' >> beam.Create(third_element)
side = (side1, side2) | beam.Flatten()
assert_that(
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
@@ -201,7 +206,7 @@ class FnApiRunnerTest(unittest.TestCase):
label='CheckFlattenAsSideInput')
assert_that(
(side, side3) | 'FlattenAfter' >> beam.Flatten(),
- equal_to([('a', 1), ('b', 2), ('b', 3)]),
+ equal_to([('a', 1), ('b', 2)] + third_element),
label='CheckFlattenOfSideInput')
def test_gbk_side_input(self):
@@ -360,12 +365,17 @@ class FnApiRunnerTest(unittest.TestCase):
assert_that(p | beam.Create([1, 2, 3]) | beam.Reshuffle(),
equal_to([1, 2, 3]))
- def test_flatten(self):
+ def test_flatten(self, with_transcoding=True):
with self.create_pipeline() as p:
+ if with_transcoding:
+ # Additional element which does not match with the first type
+ additional = [ord('d')]
+ else:
+ additional = ['d']
res = (p | 'a' >> beam.Create(['a']),
p | 'bc' >> beam.Create(['b', 'c']),
- p | 'd' >> beam.Create(['d'])) | beam.Flatten()
- assert_that(res, equal_to(['a', 'b', 'c', 'd']))
+ p | 'd' >> beam.Create(additional)) | beam.Flatten()
+ assert_that(res, equal_to(['a', 'b', 'c'] + additional))
def test_combine_per_key(self):
with self.create_pipeline() as p: