You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:56 UTC
[6/9] incubator-beam git commit: More complicated window tests.
More complicated window tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/989a189d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/989a189d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/989a189d
Branch: refs/heads/python-sdk
Commit: 989a189d6495163bb4f80e310834ef927dbefef5
Parents: 7386bcc
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Oct 13 18:02:12 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 2 +-
.../python/apache_beam/transforms/sideinputs.py | 2 +-
.../apache_beam/transforms/sideinputs_test.py | 69 +++++++++++++++++++-
3 files changed, 69 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 86fd819..cd06879 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -150,7 +150,7 @@ class DoFnRunner(Receiver):
if self.has_windowed_side_inputs and len(element.windows) > 1:
for w in element.windows:
self.context.set_element(
- WindowedValue(element.value, element.timestamp, w))
+ WindowedValue(element.value, element.timestamp, (w,)))
self._process_outputs(element, self.dofn_process(self.context))
else:
self.context.set_element(element)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 00c2852..2079c93 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -170,7 +170,7 @@ def default_window_mapping_fn(target_window_fn):
else:
def map_via_end(source_window):
return list(target_window_fn.assign(
- window.WindowFn.AssignContext(source_window.max_timestamp())))[0]
+ window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
return map_via_end
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 8b5b4e0..f84ff57 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -38,9 +38,13 @@ class SideInputsTest(unittest.TestCase):
main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn)
side = pcoll | 'WindowSide' >> beam.WindowInto(
side_window_fn or main_window_fn)
+ kw = {}
if combine_fn is not None:
- side |= beam.CombineGlobally(combine_fn)
- res = main | beam.Map(lambda x, s: (x, s), side_input_type(side))
+ side |= beam.CombineGlobally(combine_fn).without_defaults()
+ kw['default_value'] = 0
+ elif side_input_type == beam.pvalue.AsDict:
+ side |= beam.Map(lambda x: ('k%s' % x, 'v%s' % x))
+ res = main | beam.Map(lambda x, s: (x, s), side_input_type(side, **kw))
if side_input_type in (beam.pvalue.AsIter, beam.pvalue.AsList):
res |= beam.Map(lambda (x, s): (x, sorted(s)))
assert_that(res, equal_to(expected))
@@ -57,6 +61,67 @@ class SideInputsTest(unittest.TestCase):
window.FixedWindows(10),
expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])
+ def test_different_fixed_windows(self):
+ self.run_windowed_side_inputs(
+ [1, 2, 11, 21, 31],
+ window.FixedWindows(10),
+ window.FixedWindows(20),
+ expected=[(1, [1, 2, 11]), (2, [1, 2, 11]), (11, [1, 2, 11]),
+ (21, [21, 31]), (31, [21, 31])])
+
+ def test_fixed_global_window(self):
+ self.run_windowed_side_inputs(
+ [1, 2, 11],
+ window.FixedWindows(10),
+ window.GlobalWindows(),
+ expected=[(1, [1, 2, 11]), (2, [1, 2, 11]), (11, [1, 2, 11])])
+
+ def test_sliding_windows(self):
+ self.run_windowed_side_inputs(
+ [1, 2, 4],
+ window.SlidingWindows(size=6, period=2),
+ window.SlidingWindows(size=6, period=2),
+ expected=[
+ # Element 1 falls in three windows
+ (1, [1]), # [-4, 2)
+ (1, [1, 2]), # [-2, 4)
+ (1, [1, 2, 4]), # [0, 6)
+ # as does 2,
+ (2, [1, 2]), # [-2, 4)
+ (2, [1, 2, 4]), # [0, 6)
+ (2, [2, 4]), # [2, 8)
+ # and 4.
+ (4, [1, 2, 4]), # [0, 6)
+ (4, [2, 4]), # [2, 8)
+ (4, [4]), # [4, 10)
+ ])
+
+ def test_windowed_iter(self):
+ self.run_windowed_side_inputs(
+ [1, 2, 11],
+ window.FixedWindows(10),
+ side_input_type=beam.pvalue.AsIter,
+ expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])
+
+ def test_windowed_singleton(self):
+ self.run_windowed_side_inputs(
+ [1, 2, 11],
+ window.FixedWindows(10),
+ side_input_type=beam.pvalue.AsSingleton,
+ combine_fn=sum,
+ expected=[(1, 3), (2, 3), (11, 11)])
+
+ def test_windowed_dict(self):
+ self.run_windowed_side_inputs(
+ [1, 2, 11],
+ window.FixedWindows(10),
+ side_input_type=beam.pvalue.AsDict,
+ expected=[
+ (1, {'k1': 'v1', 'k2': 'v2'}),
+ (2, {'k1': 'v1', 'k2': 'v2'}),
+ (11, {'k11': 'v11'}),
+ ])
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)