You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:52 UTC

[2/9] incubator-beam git commit: Implement windowed side inputs for direct runner.

Implement windowed side inputs for direct runner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29a73789
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29a73789
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29a73789

Branch: refs/heads/python-sdk
Commit: 29a73789c787588dd35edc7964b398961c627cdf
Parents: 9007376
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 11 15:27:58 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:15 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pvalue.py               | 44 ++++++++++---
 sdks/python/apache_beam/runners/common.pxd      |  1 +
 sdks/python/apache_beam/runners/common.py       | 42 ++++++++----
 .../python/apache_beam/runners/direct_runner.py | 25 +-------
 .../inprocess/inprocess_evaluation_context.py   |  8 +--
 .../python/apache_beam/transforms/sideinputs.py | 67 +++++++++++++++++---
 .../apache_beam/transforms/sideinputs_test.py   |  8 ---
 sdks/python/apache_beam/transforms/timeutil.py  |  4 ++
 sdks/python/apache_beam/transforms/window.py    |  3 +
 9 files changed, 139 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 063d0b5..f8b7feb 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -27,6 +27,7 @@ produced when the pipeline gets executed.
 from __future__ import absolute_import
 
 import collections
+import itertools
 
 
 class PValue(object):
@@ -227,9 +228,10 @@ class SideOutputValue(object):
 class PCollectionView(PValue):
   """An immutable view of a PCollection that can be used as a side input."""
 
-  def __init__(self, pipeline):
+  def __init__(self, pipeline, window_mapping_fn):
     """Initializes a PCollectionView. Do not call directly."""
     super(PCollectionView, self).__init__(pipeline)
+    self._window_mapping_fn = window_mapping_fn
 
   @property
   def windowing(self):
@@ -246,34 +248,60 @@ class PCollectionView(PValue):
     Returns:
       Tuple of options for the given view.
     """
-    return ()
+    return {'window_mapping_fn': self._window_mapping_fn}
 
 
 class SingletonPCollectionView(PCollectionView):
   """A PCollectionView that contains a single object."""
 
-  def __init__(self, pipeline, has_default, default_value):
-    super(SingletonPCollectionView, self).__init__(pipeline)
+  def __init__(self, pipeline, has_default, default_value,
+               window_mapping_fn):
+    super(SingletonPCollectionView, self).__init__(pipeline, window_mapping_fn)
     self.has_default = has_default
     self.default_value = default_value
 
   def _view_options(self):
-    return (self.has_default, self.default_value)
+    base = super(SingletonPCollectionView, self)._view_options()
+    if self.has_default:
+      return dict(base, default=self.default_value)
+    else:
+      return base
+
+  @staticmethod
+  def from_iterable(it, options):
+    head = list(itertools.islice(it, 2))
+    if len(head) == 0:
+      return options.get('default', EmptySideInput())
+    elif len(head) == 1:
+      return head[0]
+    else:
+      raise ValueError(
+        'PCollection with more than one element accessed as '
+        'a singleton view.')
 
 
 class IterablePCollectionView(PCollectionView):
   """A PCollectionView that can be treated as an iterable."""
-  pass
+
+  @staticmethod
+  def from_iterable(it, options):
+    return it
 
 
 class ListPCollectionView(PCollectionView):
   """A PCollectionView that can be treated as a list."""
-  pass
+
+  @staticmethod
+  def from_iterable(it, options):
+    return list(it)
 
 
 class DictPCollectionView(PCollectionView):
   """A PCollectionView that can be treated as a dict."""
-  pass
+
+  @staticmethod
+  def from_iterable(it, options):
+    return dict(it)
 
 
 def _get_cached_view(pipeline, key):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 5cd4cf8..085fd11 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -36,6 +36,7 @@ cdef class DoFnRunner(Receiver):
   cdef object tagged_receivers
   cdef LoggingContext logging_context
   cdef object step_name
+  cdef bint has_windowed_side_inputs
 
   cdef Receiver main_receivers
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 67277c3..86fd819 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -24,6 +24,8 @@ import sys
 from apache_beam.internal import util
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms import core
+from apache_beam.transforms import sideinputs
+from apache_beam.transforms import window
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
 from apache_beam.utils.windowed_value import WindowedValue
@@ -69,24 +71,36 @@ class DoFnRunner(Receiver):
                # Preferred alternative to context
                # TODO(robertwb): Remove once all runners are updated.
                state=None):
+    self.has_windowed_side_inputs = False  # Set to True in one case below.
     if not args and not kwargs:
       self.dofn = fn
       self.dofn_process = fn.process
     else:
-      args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs)
+      # TODO(robertwb): Remove when all runners pass side input maps.
+      # TODO(robertwb): Optimize for global windows case.
+      side_inputs = [side_input if isinstance(side_input, sideinputs.SideInputMap)
+                     else {window.GlobalWindow(): side_input}
+                     for side_input in side_inputs]
+      if side_inputs:
+        self.has_windowed_side_inputs = True
+        def process(context):
+          w = context.windows[0]
+          cur_args, cur_kwargs = util.insert_values_in_args(
+            args, kwargs, [side_input[w] for side_input in side_inputs])
+          return fn.process(context, *cur_args, **cur_kwargs)
+        self.dofn_process = process
+      elif kwargs:
+        self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
+      else:
+        self.dofn_process = lambda context: fn.process(context, *args)
 
       class CurriedFn(core.DoFn):
 
-        def start_bundle(self, context):
-          return fn.start_bundle(context)
-
-        def process(self, context):
-          return fn.process(context, *args, **kwargs)
+        start_bundle = staticmethod(fn.start_bundle)
+        process = staticmethod(self.dofn_process)
+        finish_bundle = staticmethod(fn.finish_bundle)
 
-        def finish_bundle(self, context):
-          return fn.finish_bundle(context)
       self.dofn = CurriedFn()
-      self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
 
     self.window_fn = windowing.windowfn
     self.tagged_receivers = tagged_receivers
@@ -133,8 +147,14 @@ class DoFnRunner(Receiver):
   def process(self, element):
     try:
       self.logging_context.enter()
-      self.context.set_element(element)
-      self._process_outputs(element, self.dofn_process(self.context))
+      if self.has_windowed_side_inputs and len(element.windows) > 1:
+        for w in element.windows:
+          self.context.set_element(
+              WindowedValue(element.value, element.timestamp, w))
+          self._process_outputs(element, self.dofn_process(self.context))
+      else:
+        self.context.set_element(element)
+        self._process_outputs(element, self.dofn_process(self.context))
     except BaseException as exn:
       self.reraise_augmented(exn)
     finally:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py
index 656cc91..936043c 100644
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct_runner.py
@@ -41,6 +41,7 @@ from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms import sideinputs
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
@@ -107,29 +108,7 @@ class DirectPipelineRunner(PipelineRunner):
     transform = transform_node.transform
     view = transform.view
     values = self._cache.get_pvalue(transform_node.inputs[0])
-    if isinstance(view, SingletonPCollectionView):
-      has_default, default_value = view._view_options()  # pylint: disable=protected-access
-      if len(values) == 0:
-        if has_default:
-          result = default_value
-        else:
-          result = EmptySideInput()
-      elif len(values) == 1:
-        # TODO(ccy): Figure out whether side inputs should ever be given as
-        # windowed values
-        result = values[0].value
-      else:
-        raise ValueError(('PCollection with more than one element accessed as '
-                          'a singleton view: %s.') % view)
-    elif isinstance(view, IterablePCollectionView):
-      result = [v.value for v in values]
-    elif isinstance(view, ListPCollectionView):
-      result = [v.value for v in values]
-    elif isinstance(view, DictPCollectionView):
-      result = dict(v.value for v in values)
-    else:
-      raise NotImplementedError
-
+    result = sideinputs.SideInputMap(type(view), view._view_options(), values)
     self._cache.cache_output(transform_node, result)
 
   @skip_if_cached

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
index 9c8b695..883be99 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
@@ -27,6 +27,7 @@ from apache_beam.pvalue import EmptySideInput
 from apache_beam.pvalue import IterablePCollectionView
 from apache_beam.pvalue import ListPCollectionView
 from apache_beam.pvalue import SingletonPCollectionView
+from apache_beam.transforms import sideinputs
 from apache_beam.runners.inprocess.clock import Clock
 from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager
 from apache_beam.runners.inprocess.inprocess_executor import TransformExecutor
@@ -106,12 +107,9 @@ class _InProcessSideInputsContainer(object):
       ValueError: If values cannot be converted into the requested form.
     """
     if isinstance(view, SingletonPCollectionView):
-      has_default, default_value = view._view_options()  # pylint: disable=protected-access
       if len(values) == 0:
-        if has_default:
-          result = default_value
-        else:
-          result = EmptySideInput()
+        # pylint: disable=protected-access
+        result = view._view_options().get('default', EmptySideInput())
       elif len(values) == 1:
         result = values[0].value
       else:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 6c698da..00c2852 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -27,6 +27,7 @@ from __future__ import absolute_import
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms import window
 
 # Type variables
 K = typehints.TypeVariable('K')
@@ -50,10 +51,6 @@ class CreatePCollectionView(PTransform):
     return input_type
 
   def apply(self, pcoll):
-    if not pcoll.windowing.is_default():
-      raise ValueError(
-          "Side inputs only supported for global windows, default triggering. "
-          "Found %s" % pcoll.windowing)
     return self.view
 
 
@@ -78,7 +75,8 @@ class ViewAsSingleton(PTransform):
     return (pcoll
             | CreatePCollectionView(
                 pvalue.SingletonPCollectionView(
-                    pcoll.pipeline, self.has_default, self.default_value))
+                    pcoll.pipeline, self.has_default, self.default_value,
+                    default_window_mapping_fn(pcoll.windowing.windowfn)))
             .with_input_types(input_type)
             .with_output_types(output_type))
 
@@ -101,7 +99,9 @@ class ViewAsIterable(PTransform):
     output_type = typehints.Iterable[input_type]
     return (pcoll
             | CreatePCollectionView(
-                pvalue.IterablePCollectionView(pcoll.pipeline))
+                pvalue.IterablePCollectionView(
+                    pcoll.pipeline,
+                    default_window_mapping_fn(pcoll.windowing.windowfn)))
             .with_input_types(input_type)
             .with_output_types(output_type))
 
@@ -123,7 +123,9 @@ class ViewAsList(PTransform):
     input_type = pcoll.element_type
     output_type = typehints.List[input_type]
     return (pcoll
-            | CreatePCollectionView(pvalue.ListPCollectionView(pcoll.pipeline))
+            | CreatePCollectionView(pvalue.ListPCollectionView(
+                pcoll.pipeline,
+                default_window_mapping_fn(pcoll.windowing.windowfn)))
             .with_input_types(input_type)
             .with_output_types(output_type))
 
@@ -150,6 +152,55 @@ class ViewAsDict(PTransform):
     output_type = typehints.Dict[key_type, value_type]
     return (pcoll
             | CreatePCollectionView(
-                pvalue.DictPCollectionView(pcoll.pipeline))
+                pvalue.DictPCollectionView(
+                    pcoll.pipeline,
+                    default_window_mapping_fn(pcoll.windowing.windowfn)))
             .with_input_types(input_type)
             .with_output_types(output_type))
+
+
+# Top-level function so we can identify it later.
+def _global_window_mapping_fn(w, global_window=window.GlobalWindow()):
+  return global_window
+
+
+def default_window_mapping_fn(target_window_fn):
+  if target_window_fn == window.GlobalWindows():
+    return _global_window_mapping_fn
+  else:
+    def map_via_end(source_window):
+      return list(target_window_fn.assign(
+        window.WindowFn.AssignContext(source_window.max_timestamp())))[0]
+    return map_via_end
+
+
+class SideInputMap(object):
+  """Represents a mapping of windows to side input values."""
+
+  def __init__(self, view_class, view_options, iterable):
+    self._window_mapping_fn = view_options['window_mapping_fn']
+    self._view_class = view_class
+    self._view_options = view_options
+    self._iterable = iterable
+    self._cache = {}
+
+  def __getitem__(self, window):
+    if window not in self._cache:
+      target_window = self._window_mapping_fn(window)
+      self._cache[window] = self._view_class.from_iterable(
+        _FilteringIterable(self._iterable, target_window), self._view_options)
+    return self._cache[window]
+
+
+class _FilteringIterable(object):
+  """An iterable containing only those values in the given window.
+  """
+  
+  def __init__(self, iterable, target_window):
+    self._iterable = iterable
+    self._target_window = target_window
+
+  def __iter__(self):
+    for wv in self._iterable:
+      if self._target_window in wv.windows:
+        yield wv.value

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 68deba8..8b5b4e0 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -27,14 +27,6 @@ from apache_beam.transforms.util import assert_that, equal_to
 
 class SideInputsTest(unittest.TestCase):
 
-  # TODO(BEAM-733): Actually support this.
-  def test_no_sideinput_windowing(self):
-    p = beam.Pipeline('DirectPipelineRunner')
-    pc = p | beam.Create([0, 1]) | beam.WindowInto(window.FixedWindows(10))
-    with self.assertRaises(ValueError):
-      # pylint: disable=expression-not-assigned
-      pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc))
-
   def run_windowed_side_inputs(self, elements, main_window_fn,
                                side_window_fn=None,
                                side_input_type=beam.pvalue.AsList,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 4092b60..f026d83 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -58,6 +58,10 @@ class Timestamp(object):
       return seconds
     return Timestamp(seconds)
 
+  def predecessor(self):
+    """Returns the largest timestamp smaller than self."""
+    return Timestamp(micros=self.micros - 1)
+
   def __repr__(self):
     micros = self.micros
     sign = ''

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index e07814d..9485032 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -143,6 +143,9 @@ class BoundedWindow(object):
   def __init__(self, end):
     self.end = Timestamp.of(end)
 
+  def max_timestamp(self):
+    return self.end.predecessor()
+
   def __cmp__(self, other):
     # Order first by endpoint, then arbitrarily.
     return cmp(self.end, other.end) or cmp(hash(self), hash(other))