You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:54 UTC

[4/9] incubator-beam git commit: Minor fixups for better testing

Minor fixups for better testing


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a50efccc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a50efccc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a50efccc

Branch: refs/heads/python-sdk
Commit: a50efccc4b79e95cedf49d2680df67dd2b587927
Parents: 82f553e
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Oct 17 17:50:32 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.py             | 5 +++--
 sdks/python/apache_beam/transforms/sideinputs.py      | 7 ++++++-
 sdks/python/apache_beam/transforms/sideinputs_test.py | 5 ++++-
 3 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index e500fd8..cc834ba 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -82,8 +82,9 @@ class DoFnRunner(Receiver):
                      if isinstance(side_input, sideinputs.SideInputMap)
                      else {global_window: side_input}
                      for side_input in side_inputs]
-      if side_inputs and all(side_input.is_globally_windowed()
-                             for side_input in side_inputs):
+      if side_inputs and all(
+          isinstance(side_input, dict) or side_input.is_globally_windowed()
+          for side_input in side_inputs):
         args, kwargs = util.insert_values_in_args(
             args, kwargs, [side_input[global_window]
                            for side_input in side_inputs])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 182179a..f3a7178 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -178,7 +178,8 @@ class SideInputMap(object):
   """Represents a mapping of windows to side input values."""
 
   def __init__(self, view_class, view_options, iterable):
-    self._window_mapping_fn = view_options['window_mapping_fn']
+    self._window_mapping_fn = view_options.get(
+        'window_mapping_fn', _global_window_mapping_fn)
     self._view_class = view_class
     self._view_options = view_options
     self._iterable = iterable
@@ -207,3 +208,7 @@ class _FilteringIterable(object):
     for wv in self._iterable:
       if self._target_window in wv.windows:
         yield wv.value
+
+  def __reduce__(self):
+    # Pickle self as an already filtered list.
+    return list, (list(self),)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 28e324a..caf3652 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -27,12 +27,15 @@ from apache_beam.transforms.util import assert_that, equal_to
 
 class SideInputsTest(unittest.TestCase):
 
+  def create_pipeline(self):
+    return beam.Pipeline('DirectPipelineRunner')
+
   def run_windowed_side_inputs(self, elements, main_window_fn,
                                side_window_fn=None,
                                side_input_type=beam.pvalue.AsList,
                                combine_fn=None,
                                expected=None):
-    with beam.Pipeline('DirectPipelineRunner') as p:
+    with self.create_pipeline() as p:
       pcoll = p | beam.Create(elements) | beam.Map(
           lambda t: window.TimestampedValue(t, t))
       main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn)