You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/01/12 00:54:52 UTC
[beam] branch release-2.10.0 updated: [BEAM-6294] Ensure input and
output coders are equal for reshuffle transforms.
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.10.0 by this push:
new de94544 [BEAM-6294] Ensure input and output coders are equal for reshuffle transforms.
de94544 is described below
commit de94544ac70e70c48b407f4b440e08661db59b94
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Jan 10 14:22:19 2019 +0100
[BEAM-6294] Ensure input and output coders are equal for reshuffle transforms.
The type declarations were there, but not getting applied due to a
longstanding TODO. This doesn't resolve that TODO completely, but fixes
a large number of cases, including this one.
---
sdks/python/apache_beam/pipeline.py | 5 +++--
sdks/python/apache_beam/runners/portability/fn_api_runner_test.py | 6 ++++++
2 files changed, 9 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 577e773..20ac5f0 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -544,11 +544,12 @@ class Pipeline(object):
def _infer_result_type(self, transform, inputs, result_pcollection):
# TODO(robertwb): Multi-input, multi-output inference.
- # TODO(robertwb): Ideally we'd do intersection here.
type_options = self._options.view_as(TypeOptions)
if (type_options is not None and type_options.pipeline_type_check
and isinstance(result_pcollection, pvalue.PCollection)
- and not result_pcollection.element_type):
+ and (not result_pcollection.element_type
+ # TODO(robertwb): Ideally we'd do intersection here.
+ or result_pcollection.element_type == typehints.Any)):
input_element_type = (
inputs[0].element_type
if len(inputs) == 1
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 3fecbe5..dc248d5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -363,6 +363,12 @@ class FnApiRunnerTest(unittest.TestCase):
| beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))
+ # Runners may special case the Reshuffle transform urn.
+ def test_reshuffle(self):
+ with self.create_pipeline() as p:
+ assert_that(p | beam.Create([1, 2, 3]) | beam.Reshuffle(),
+ equal_to([1, 2, 3]))
+
def test_flatten(self):
with self.create_pipeline() as p:
res = (p | 'a' >> beam.Create(['a']),