You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/30 23:03:19 UTC

[12/50] [abbrv] beam git commit: Code cleanup now that all runners support windowed side inputs.

Code cleanup now that all runners support windowed side inputs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6cb2f37e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6cb2f37e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6cb2f37e

Branch: refs/heads/master
Commit: 6cb2f37efadfb52138b125fcaf51e703c2c5fd5a
Parents: deb2aea
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jan 21 21:13:36 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jan 23 14:37:45 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.py | 34 ++++++++++++++------------
 1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6cb2f37e/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 0f63cbc..9c8fdfc 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -24,7 +24,6 @@ import sys
 from apache_beam.internal import util
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms import core
-from apache_beam.transforms import sideinputs
 from apache_beam.transforms import window
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
@@ -71,6 +70,21 @@ class DoFnRunner(Receiver):
                # Preferred alternative to context
                # TODO(robertwb): Remove once all runners are updated.
                state=None):
+    """Initializes a DoFnRunner.
+
+    Args:
+      fn: user DoFn to invoke
+      args: positional side input arguments (static and placeholder), if any
+      kwargs: keyword side input arguments (static and placeholder), if any
+      side_inputs: list of sideinput.SideInputMaps for deferred side inputs
+      windowing: windowing properties of the output PCollection(s)
+      context: a DoFnContext to use (deprecated)
+      tagged_receivers: a dict of tag name to Receiver objects
+      logger: a logging module (deprecated)
+      step_name: the name of this step
+      logging_context: a LoggingContext object
+      state: handle for accessing DoFn state
+    """
     self.step_name = step_name
     self.window_fn = windowing.windowfn
     self.tagged_receivers = tagged_receivers
@@ -97,14 +111,10 @@ class DoFnRunner(Receiver):
     if isinstance(fn, core.NewDoFn):
       self.is_new_dofn = True
 
-      # SideInputs
-      self.side_inputs = [side_input
-                          if isinstance(side_input, sideinputs.SideInputMap)
-                          else {global_window: side_input}
-                          for side_input in side_inputs]
+      # Stash values for use in new_dofn_process.
+      self.side_inputs = side_inputs
       self.has_windowed_side_inputs = not all(
-          isinstance(si, dict) or si.is_globally_windowed()
-          for si in self.side_inputs)
+          si.is_globally_windowed() for si in self.side_inputs)
 
       self.args = args if args else []
       self.kwargs = kwargs if kwargs else {}
@@ -117,14 +127,8 @@ class DoFnRunner(Receiver):
         self.dofn = fn
         self.dofn_process = fn.process
       else:
-        # TODO(robertwb): Remove when all runners pass side input maps.
-        side_inputs = [side_input
-                       if isinstance(side_input, sideinputs.SideInputMap)
-                       else {global_window: side_input}
-                       for side_input in side_inputs]
         if side_inputs and all(
-            isinstance(side_input, dict) or side_input.is_globally_windowed()
-            for side_input in side_inputs):
+            side_input.is_globally_windowed() for side_input in side_inputs):
           args, kwargs = util.insert_values_in_args(
               args, kwargs, [side_input[global_window]
                              for side_input in side_inputs])