You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/01/12 00:54:52 UTC

[beam] branch release-2.10.0 updated: [BEAM-6294] Ensure input and output coders are equal for reshuffle transforms.

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.10.0 by this push:
     new de94544  [BEAM-6294] Ensure input and output coders are equal for reshuffle transforms.
de94544 is described below

commit de94544ac70e70c48b407f4b440e08661db59b94
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Jan 10 14:22:19 2019 +0100

    [BEAM-6294] Ensure input and output coders are equal for reshuffle transforms.
    
    The type declarations were there, but not getting applied due to a
    longstanding TODO.  This doesn't resolve that TODO completely, but fixes
    a large number of cases, including this one.
---
 sdks/python/apache_beam/pipeline.py                               | 5 +++--
 sdks/python/apache_beam/runners/portability/fn_api_runner_test.py | 6 ++++++
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 577e773..20ac5f0 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -544,11 +544,12 @@ class Pipeline(object):
 
   def _infer_result_type(self, transform, inputs, result_pcollection):
     # TODO(robertwb): Multi-input, multi-output inference.
-    # TODO(robertwb): Ideally we'd do intersection here.
     type_options = self._options.view_as(TypeOptions)
     if (type_options is not None and type_options.pipeline_type_check
         and isinstance(result_pcollection, pvalue.PCollection)
-        and not result_pcollection.element_type):
+        and (not result_pcollection.element_type
+             # TODO(robertwb): Ideally we'd do intersection here.
+             or result_pcollection.element_type == typehints.Any)):
       input_element_type = (
           inputs[0].element_type
           if len(inputs) == 1
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 3fecbe5..dc248d5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -363,6 +363,12 @@ class FnApiRunnerTest(unittest.TestCase):
              | beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
       assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))
 
+  # Runners may special case the Reshuffle transform urn.
+  def test_reshuffle(self):
+    with self.create_pipeline() as p:
+      assert_that(p | beam.Create([1, 2, 3]) | beam.Reshuffle(),
+                  equal_to([1, 2, 3]))
+
   def test_flatten(self):
     with self.create_pipeline() as p:
       res = (p | 'a' >> beam.Create(['a']),