You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/10 11:48:00 UTC

[jira] [Work logged] (BEAM-5702) Avoid reshuffle for zero and one element creates

     [ https://issues.apache.org/jira/browse/BEAM-5702?focusedWorklogId=153109&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153109 ]

ASF GitHub Bot logged work on BEAM-5702:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Oct/18 11:47
            Start Date: 10/Oct/18 11:47
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6582: [BEAM-5702] Special case zero and one element Creates.
URL: https://github.com/apache/beam/pull/6582
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 5095e48d802..502ebd99fc9 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -47,4 +47,4 @@ def get_replacement_transform(self, ptransform):
     from apache_beam.runners.dataflow.native_io.streaming_create import \
       StreamingCreate
     coder = typecoders.registry.get_coder(ptransform.get_output_type())
-    return StreamingCreate(ptransform.value, coder)
+    return StreamingCreate(ptransform.values, coder)
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index ef6bb750b5f..b00ba21ee28 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -38,6 +38,7 @@
 from apache_beam.coders import typecoders
 from apache_beam.internal import pickler
 from apache_beam.internal import util
+from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import TypeOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
@@ -1942,19 +1943,19 @@ def from_runner_api_parameter(unused_parameter, unused_context):
 class Create(PTransform):
   """A transform that creates a PCollection from an iterable."""
 
-  def __init__(self, value):
+  def __init__(self, values):
     """Initializes a Create transform.
 
     Args:
-      value: An object of values for the PCollection
+      values: An object of values for the PCollection
     """
     super(Create, self).__init__()
-    if isinstance(value, (unicode, str, bytes)):
+    if isinstance(values, (unicode, str, bytes)):
       raise TypeError('PTransform Create: Refusing to treat string as '
-                      'an iterable. (string=%r)' % value)
-    elif isinstance(value, dict):
-      value = value.items()
-    self.value = tuple(value)
+                      'an iterable. (string=%r)' % values)
+    elif isinstance(values, dict):
+      values = values.items()
+    self.values = tuple(values)
 
   def to_runner_api_parameter(self, context):
     # Required as this is identified by type in PTransformOverrides.
@@ -1962,9 +1963,9 @@ def to_runner_api_parameter(self, context):
     return self.to_runner_api_pickled(context)
 
   def infer_output_type(self, unused_input_type):
-    if not self.value:
+    if not self.values:
       return Any
-    return Union[[trivial_inference.instance_to_type(v) for v in self.value]]
+    return Union[[trivial_inference.instance_to_type(v) for v in self.values]]
 
   def get_output_type(self):
     return (self.get_type_hints().simple_output_type(self.label) or
@@ -1975,9 +1976,25 @@ def expand(self, pbegin):
     assert isinstance(pbegin, pvalue.PBegin)
     self.pipeline = pbegin.pipeline
     coder = typecoders.registry.get_coder(self.get_output_type())
-    source = self._create_source_from_iterable(self.value, coder)
-    return (pbegin.pipeline
-            | iobase.Read(source).with_output_types(self.get_output_type()))
+    debug_options = self.pipeline._options.view_as(DebugOptions)
+    # Must guard against this as some legacy runners don't implement impulse.
+    fn_api = (debug_options.experiments
+              and 'beam_fn_api' in debug_options.experiments)
+    # Avoid the "redistributing" reshuffle for 0 and 1 element Creates.
+    # These special cases are often used in building up more complex
+    # transforms (e.g. Write).
+    if fn_api and len(self.values) == 0:
+      return pbegin | Impulse() | FlatMap(
+          lambda _: ()).with_output_types(self.get_output_type())
+    elif fn_api and len(self.values) == 1:
+      serialized_value = coder.encode(self.values[0])
+      return pbegin | Impulse() | Map(
+          lambda _: coder.decode(serialized_value)).with_output_types(
+              self.get_output_type())
+    else:
+      source = self._create_source_from_iterable(self.values, coder)
+      return (pbegin.pipeline
+              | iobase.Read(source).with_output_types(self.get_output_type()))
 
   def get_windowing(self, unused_inputs):
     return Windowing(GlobalWindows())
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index ada36725179..915056f7d5f 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -37,6 +37,8 @@ def setUp(self):
 
   def test_create_transform(self):
     with TestPipeline() as p:
+      assert_that(p | 'Empty' >> Create([]), equal_to([]), label='empty')
+      assert_that(p | 'One' >> Create([None]), equal_to([None]), label='one')
       assert_that(p | Create(list(range(10))), equal_to(list(range(10))))
 
   def test_create_source_read(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 153109)
            Time Spent: 10m
    Remaining Estimate: 0h

> Avoid reshuffle for zero and one element creates
> ------------------------------------------------
>
>                 Key: BEAM-5702
>                 URL: https://issues.apache.org/jira/browse/BEAM-5702
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Ahmet Altay
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> These are commonly used (e.g. for Writes or CombineGlobally with Default) and can be implemented directly on top of impulse rather. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)