You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:52 UTC
[2/9] incubator-beam git commit: Implement windowed side inputs for
direct runner.
Implement windowed side inputs for direct runner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29a73789
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29a73789
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29a73789
Branch: refs/heads/python-sdk
Commit: 29a73789c787588dd35edc7964b398961c627cdf
Parents: 9007376
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 11 15:27:58 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:15 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pvalue.py | 44 ++++++++++---
sdks/python/apache_beam/runners/common.pxd | 1 +
sdks/python/apache_beam/runners/common.py | 42 ++++++++----
.../python/apache_beam/runners/direct_runner.py | 25 +-------
.../inprocess/inprocess_evaluation_context.py | 8 +--
.../python/apache_beam/transforms/sideinputs.py | 67 +++++++++++++++++---
.../apache_beam/transforms/sideinputs_test.py | 8 ---
sdks/python/apache_beam/transforms/timeutil.py | 4 ++
sdks/python/apache_beam/transforms/window.py | 3 +
9 files changed, 139 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 063d0b5..f8b7feb 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -27,6 +27,7 @@ produced when the pipeline gets executed.
from __future__ import absolute_import
import collections
+import itertools
class PValue(object):
@@ -227,9 +228,10 @@ class SideOutputValue(object):
class PCollectionView(PValue):
"""An immutable view of a PCollection that can be used as a side input."""
- def __init__(self, pipeline):
+ def __init__(self, pipeline, window_mapping_fn):
"""Initializes a PCollectionView. Do not call directly."""
super(PCollectionView, self).__init__(pipeline)
+ self._window_mapping_fn = window_mapping_fn
@property
def windowing(self):
@@ -246,34 +248,60 @@ class PCollectionView(PValue):
Returns:
Tuple of options for the given view.
"""
- return ()
+ return {'window_mapping_fn': self._window_mapping_fn}
class SingletonPCollectionView(PCollectionView):
"""A PCollectionView that contains a single object."""
- def __init__(self, pipeline, has_default, default_value):
- super(SingletonPCollectionView, self).__init__(pipeline)
+ def __init__(self, pipeline, has_default, default_value,
+ window_mapping_fn):
+ super(SingletonPCollectionView, self).__init__(pipeline, window_mapping_fn)
self.has_default = has_default
self.default_value = default_value
def _view_options(self):
- return (self.has_default, self.default_value)
+ base = super(SingletonPCollectionView, self)._view_options()
+ if self.has_default:
+ return dict(base, default=self.default_value)
+ else:
+ return base
+
+ @staticmethod
+ def from_iterable(it, options):
+ head = list(itertools.islice(it, 2))
+ if len(head) == 0:
+ return options.get('default', EmptySideInput())
+ elif len(head) == 1:
+ return head[0]
+ else:
+ raise ValueError(
+ 'PCollection with more than one element accessed as '
+ 'a singleton view.')
class IterablePCollectionView(PCollectionView):
"""A PCollectionView that can be treated as an iterable."""
- pass
+
+ @staticmethod
+ def from_iterable(it, options):
+ return it
class ListPCollectionView(PCollectionView):
"""A PCollectionView that can be treated as a list."""
- pass
+
+ @staticmethod
+ def from_iterable(it, options):
+ return list(it)
class DictPCollectionView(PCollectionView):
"""A PCollectionView that can be treated as a dict."""
- pass
+
+ @staticmethod
+ def from_iterable(it, options):
+ return dict(it)
def _get_cached_view(pipeline, key):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 5cd4cf8..085fd11 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -36,6 +36,7 @@ cdef class DoFnRunner(Receiver):
cdef object tagged_receivers
cdef LoggingContext logging_context
cdef object step_name
+ cdef bint has_windowed_side_inputs
cdef Receiver main_receivers
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 67277c3..86fd819 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -24,6 +24,8 @@ import sys
from apache_beam.internal import util
from apache_beam.pvalue import SideOutputValue
from apache_beam.transforms import core
+from apache_beam.transforms import sideinputs
+from apache_beam.transforms import window
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.windowed_value import WindowedValue
@@ -69,24 +71,36 @@ class DoFnRunner(Receiver):
# Preferred alternative to context
# TODO(robertwb): Remove once all runners are updated.
state=None):
+ self.has_windowed_side_inputs = False # Set to True in one case below.
if not args and not kwargs:
self.dofn = fn
self.dofn_process = fn.process
else:
- args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs)
+ # TODO(robertwb): Remove when all runners pass side input maps.
+ # TODO(robertwb): Optimize for global windows case.
+ side_inputs = [side_input if isinstance(side_input, sideinputs.SideInputMap)
+ else {window.GlobalWindow(): side_input}
+ for side_input in side_inputs]
+ if side_inputs:
+ self.has_windowed_side_inputs = True
+ def process(context):
+ w = context.windows[0]
+ cur_args, cur_kwargs = util.insert_values_in_args(
+ args, kwargs, [side_input[w] for side_input in side_inputs])
+ return fn.process(context, *cur_args, **cur_kwargs)
+ self.dofn_process = process
+ elif kwargs:
+ self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
+ else:
+ self.dofn_process = lambda context: fn.process(context, *args)
class CurriedFn(core.DoFn):
- def start_bundle(self, context):
- return fn.start_bundle(context)
-
- def process(self, context):
- return fn.process(context, *args, **kwargs)
+ start_bundle = staticmethod(fn.start_bundle)
+ process = staticmethod(self.dofn_process)
+ finish_bundle = staticmethod(fn.finish_bundle)
- def finish_bundle(self, context):
- return fn.finish_bundle(context)
self.dofn = CurriedFn()
- self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
self.window_fn = windowing.windowfn
self.tagged_receivers = tagged_receivers
@@ -133,8 +147,14 @@ class DoFnRunner(Receiver):
def process(self, element):
try:
self.logging_context.enter()
- self.context.set_element(element)
- self._process_outputs(element, self.dofn_process(self.context))
+ if self.has_windowed_side_inputs and len(element.windows) > 1:
+ for w in element.windows:
+ self.context.set_element(
+ WindowedValue(element.value, element.timestamp, w))
+ self._process_outputs(element, self.dofn_process(self.context))
+ else:
+ self.context.set_element(element)
+ self._process_outputs(element, self.dofn_process(self.context))
except BaseException as exn:
self.reraise_augmented(exn)
finally:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py
index 656cc91..936043c 100644
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct_runner.py
@@ -41,6 +41,7 @@ from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms import sideinputs
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import WindowedValue
from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
@@ -107,29 +108,7 @@ class DirectPipelineRunner(PipelineRunner):
transform = transform_node.transform
view = transform.view
values = self._cache.get_pvalue(transform_node.inputs[0])
- if isinstance(view, SingletonPCollectionView):
- has_default, default_value = view._view_options() # pylint: disable=protected-access
- if len(values) == 0:
- if has_default:
- result = default_value
- else:
- result = EmptySideInput()
- elif len(values) == 1:
- # TODO(ccy): Figure out whether side inputs should ever be given as
- # windowed values
- result = values[0].value
- else:
- raise ValueError(('PCollection with more than one element accessed as '
- 'a singleton view: %s.') % view)
- elif isinstance(view, IterablePCollectionView):
- result = [v.value for v in values]
- elif isinstance(view, ListPCollectionView):
- result = [v.value for v in values]
- elif isinstance(view, DictPCollectionView):
- result = dict(v.value for v in values)
- else:
- raise NotImplementedError
-
+ result = sideinputs.SideInputMap(type(view), view._view_options(), values)
self._cache.cache_output(transform_node, result)
@skip_if_cached
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
index 9c8b695..883be99 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
@@ -27,6 +27,7 @@ from apache_beam.pvalue import EmptySideInput
from apache_beam.pvalue import IterablePCollectionView
from apache_beam.pvalue import ListPCollectionView
from apache_beam.pvalue import SingletonPCollectionView
+from apache_beam.transforms import sideinputs
from apache_beam.runners.inprocess.clock import Clock
from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager
from apache_beam.runners.inprocess.inprocess_executor import TransformExecutor
@@ -106,12 +107,9 @@ class _InProcessSideInputsContainer(object):
ValueError: If values cannot be converted into the requested form.
"""
if isinstance(view, SingletonPCollectionView):
- has_default, default_value = view._view_options() # pylint: disable=protected-access
if len(values) == 0:
- if has_default:
- result = default_value
- else:
- result = EmptySideInput()
+ # pylint: disable=protected-access
+ result = view._view_options().get('default', EmptySideInput())
elif len(values) == 1:
result = values[0].value
else:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 6c698da..00c2852 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -27,6 +27,7 @@ from __future__ import absolute_import
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms import window
# Type variables
K = typehints.TypeVariable('K')
@@ -50,10 +51,6 @@ class CreatePCollectionView(PTransform):
return input_type
def apply(self, pcoll):
- if not pcoll.windowing.is_default():
- raise ValueError(
- "Side inputs only supported for global windows, default triggering. "
- "Found %s" % pcoll.windowing)
return self.view
@@ -78,7 +75,8 @@ class ViewAsSingleton(PTransform):
return (pcoll
| CreatePCollectionView(
pvalue.SingletonPCollectionView(
- pcoll.pipeline, self.has_default, self.default_value))
+ pcoll.pipeline, self.has_default, self.default_value,
+ default_window_mapping_fn(pcoll.windowing.windowfn)))
.with_input_types(input_type)
.with_output_types(output_type))
@@ -101,7 +99,9 @@ class ViewAsIterable(PTransform):
output_type = typehints.Iterable[input_type]
return (pcoll
| CreatePCollectionView(
- pvalue.IterablePCollectionView(pcoll.pipeline))
+ pvalue.IterablePCollectionView(
+ pcoll.pipeline,
+ default_window_mapping_fn(pcoll.windowing.windowfn)))
.with_input_types(input_type)
.with_output_types(output_type))
@@ -123,7 +123,9 @@ class ViewAsList(PTransform):
input_type = pcoll.element_type
output_type = typehints.List[input_type]
return (pcoll
- | CreatePCollectionView(pvalue.ListPCollectionView(pcoll.pipeline))
+ | CreatePCollectionView(pvalue.ListPCollectionView(
+ pcoll.pipeline,
+ default_window_mapping_fn(pcoll.windowing.windowfn)))
.with_input_types(input_type)
.with_output_types(output_type))
@@ -150,6 +152,55 @@ class ViewAsDict(PTransform):
output_type = typehints.Dict[key_type, value_type]
return (pcoll
| CreatePCollectionView(
- pvalue.DictPCollectionView(pcoll.pipeline))
+ pvalue.DictPCollectionView(
+ pcoll.pipeline,
+ default_window_mapping_fn(pcoll.windowing.windowfn)))
.with_input_types(input_type)
.with_output_types(output_type))
+
+
+# Top-level function so we can identify it later.
+def _global_window_mapping_fn(w, global_window=window.GlobalWindow()):
+ return global_window
+
+
+def default_window_mapping_fn(target_window_fn):
+ if target_window_fn == window.GlobalWindows():
+ return _global_window_mapping_fn
+ else:
+ def map_via_end(source_window):
+ return list(target_window_fn.assign(
+ window.WindowFn.AssignContext(source_window.max_timestamp())))[0]
+ return map_via_end
+
+
+class SideInputMap(object):
+ """Represents a mapping of windows to side input values."""
+
+ def __init__(self, view_class, view_options, iterable):
+ self._window_mapping_fn = view_options['window_mapping_fn']
+ self._view_class = view_class
+ self._view_options = view_options
+ self._iterable = iterable
+ self._cache = {}
+
+ def __getitem__(self, window):
+ if window not in self._cache:
+ target_window = self._window_mapping_fn(window)
+ self._cache[window] = self._view_class.from_iterable(
+ _FilteringIterable(self._iterable, target_window), self._view_options)
+ return self._cache[window]
+
+
+class _FilteringIterable(object):
+ """An iterable containing only those values in the given window.
+ """
+
+ def __init__(self, iterable, target_window):
+ self._iterable = iterable
+ self._target_window = target_window
+
+ def __iter__(self):
+ for wv in self._iterable:
+ if self._target_window in wv.windows:
+ yield wv.value
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 68deba8..8b5b4e0 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -27,14 +27,6 @@ from apache_beam.transforms.util import assert_that, equal_to
class SideInputsTest(unittest.TestCase):
- # TODO(BEAM-733): Actually support this.
- def test_no_sideinput_windowing(self):
- p = beam.Pipeline('DirectPipelineRunner')
- pc = p | beam.Create([0, 1]) | beam.WindowInto(window.FixedWindows(10))
- with self.assertRaises(ValueError):
- # pylint: disable=expression-not-assigned
- pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc))
-
def run_windowed_side_inputs(self, elements, main_window_fn,
side_window_fn=None,
side_input_type=beam.pvalue.AsList,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 4092b60..f026d83 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -58,6 +58,10 @@ class Timestamp(object):
return seconds
return Timestamp(seconds)
+ def predecessor(self):
+ """Returns the largest timestamp smaller than self."""
+ return Timestamp(micros=self.micros - 1)
+
def __repr__(self):
micros = self.micros
sign = ''
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index e07814d..9485032 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -143,6 +143,9 @@ class BoundedWindow(object):
def __init__(self, end):
self.end = Timestamp.of(end)
+ def max_timestamp(self):
+ return self.end.predecessor()
+
def __cmp__(self, other):
# Order first by endpoint, then arbitrarily.
return cmp(self.end, other.end) or cmp(hash(self), hash(other))