You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/23 22:38:01 UTC
[2/2] beam git commit: Code cleanup now that all runners support
windowed side inputs.
Code cleanup now that all runners support windowed side inputs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6cb2f37e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6cb2f37e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6cb2f37e
Branch: refs/heads/python-sdk
Commit: 6cb2f37efadfb52138b125fcaf51e703c2c5fd5a
Parents: deb2aea
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jan 21 21:13:36 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jan 23 14:37:45 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 34 ++++++++++++++------------
1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6cb2f37e/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 0f63cbc..9c8fdfc 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -24,7 +24,6 @@ import sys
from apache_beam.internal import util
from apache_beam.pvalue import SideOutputValue
from apache_beam.transforms import core
-from apache_beam.transforms import sideinputs
from apache_beam.transforms import window
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowFn
@@ -71,6 +70,21 @@ class DoFnRunner(Receiver):
# Preferred alternative to context
# TODO(robertwb): Remove once all runners are updated.
state=None):
+ """Initializes a DoFnRunner.
+
+ Args:
+ fn: user DoFn to invoke
+ args: positional side input arguments (static and placeholder), if any
+ kwargs: keyword side input arguments (static and placeholder), if any
+ side_inputs: list of sideinput.SideInputMaps for deferred side inputs
+ windowing: windowing properties of the output PCollection(s)
+ context: a DoFnContext to use (deprecated)
+ tagged_receivers: a dict of tag name to Receiver objects
+ logger: a logging module (deprecated)
+ step_name: the name of this step
+ logging_context: a LoggingContext object
+ state: handle for accessing DoFn state
+ """
self.step_name = step_name
self.window_fn = windowing.windowfn
self.tagged_receivers = tagged_receivers
@@ -97,14 +111,10 @@ class DoFnRunner(Receiver):
if isinstance(fn, core.NewDoFn):
self.is_new_dofn = True
- # SideInputs
- self.side_inputs = [side_input
- if isinstance(side_input, sideinputs.SideInputMap)
- else {global_window: side_input}
- for side_input in side_inputs]
+ # Stash values for use in new_dofn_process.
+ self.side_inputs = side_inputs
self.has_windowed_side_inputs = not all(
- isinstance(si, dict) or si.is_globally_windowed()
- for si in self.side_inputs)
+ si.is_globally_windowed() for si in self.side_inputs)
self.args = args if args else []
self.kwargs = kwargs if kwargs else {}
@@ -117,14 +127,8 @@ class DoFnRunner(Receiver):
self.dofn = fn
self.dofn_process = fn.process
else:
- # TODO(robertwb): Remove when all runners pass side input maps.
- side_inputs = [side_input
- if isinstance(side_input, sideinputs.SideInputMap)
- else {global_window: side_input}
- for side_input in side_inputs]
if side_inputs and all(
- isinstance(side_input, dict) or side_input.is_globally_windowed()
- for side_input in side_inputs):
+ side_input.is_globally_windowed() for side_input in side_inputs):
args, kwargs = util.insert_values_in_args(
args, kwargs, [side_input[global_window]
for side_input in side_inputs])