You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:54 UTC
[4/9] incubator-beam git commit: Minor fixups for better testing
Minor fixups for better testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a50efccc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a50efccc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a50efccc
Branch: refs/heads/python-sdk
Commit: a50efccc4b79e95cedf49d2680df67dd2b587927
Parents: 82f553e
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Oct 17 17:50:32 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 5 +++--
sdks/python/apache_beam/transforms/sideinputs.py | 7 ++++++-
sdks/python/apache_beam/transforms/sideinputs_test.py | 5 ++++-
3 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index e500fd8..cc834ba 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -82,8 +82,9 @@ class DoFnRunner(Receiver):
if isinstance(side_input, sideinputs.SideInputMap)
else {global_window: side_input}
for side_input in side_inputs]
- if side_inputs and all(side_input.is_globally_windowed()
- for side_input in side_inputs):
+ if side_inputs and all(
+ isinstance(side_input, dict) or side_input.is_globally_windowed()
+ for side_input in side_inputs):
args, kwargs = util.insert_values_in_args(
args, kwargs, [side_input[global_window]
for side_input in side_inputs])
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 182179a..f3a7178 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -178,7 +178,8 @@ class SideInputMap(object):
"""Represents a mapping of windows to side input values."""
def __init__(self, view_class, view_options, iterable):
- self._window_mapping_fn = view_options['window_mapping_fn']
+ self._window_mapping_fn = view_options.get(
+ 'window_mapping_fn', _global_window_mapping_fn)
self._view_class = view_class
self._view_options = view_options
self._iterable = iterable
@@ -207,3 +208,7 @@ class _FilteringIterable(object):
for wv in self._iterable:
if self._target_window in wv.windows:
yield wv.value
+
+ def __reduce__(self):
+ # Pickle self as an already filtered list.
+ return list, (list(self),)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 28e324a..caf3652 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -27,12 +27,15 @@ from apache_beam.transforms.util import assert_that, equal_to
class SideInputsTest(unittest.TestCase):
+ def create_pipeline(self):
+ return beam.Pipeline('DirectPipelineRunner')
+
def run_windowed_side_inputs(self, elements, main_window_fn,
side_window_fn=None,
side_input_type=beam.pvalue.AsList,
combine_fn=None,
expected=None):
- with beam.Pipeline('DirectPipelineRunner') as p:
+ with self.create_pipeline() as p:
pcoll = p | beam.Create(elements) | beam.Map(
lambda t: window.TimestampedValue(t, t))
main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn)