You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/27 01:07:18 UTC
[1/2] beam git commit: [BEAM-1316] start bundle should not output any
elements
Repository: beam
Updated Branches:
refs/heads/master 3961ce46c -> 26c61f414
[BEAM-1316] start bundle should not output any elements
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1360c21d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1360c21d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1360c21d
Branch: refs/heads/master
Commit: 1360c21dfbe6131d514a96e2d5290aa2308825de
Parents: 3961ce4
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Apr 26 15:09:03 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Apr 26 18:07:05 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 68 ++++++++++++++------
.../apache_beam/transforms/ptransform_test.py | 44 +++++++++----
2 files changed, 81 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1360c21d/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 08071a6..e2a6949 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -161,8 +161,8 @@ class DoFnInvoker(object):
defaults = self.signature.start_bundle_method.defaults
args = [self.context if d == core.DoFn.ContextParam else d
for d in defaults]
- self.output_processor.process_outputs(
- None, self.signature.start_bundle_method.method_value(*args))
+ self.output_processor.start_bundle_outputs(
+ self.signature.start_bundle_method.method_value(*args))
def invoke_finish_bundle(self):
"""Invokes the DoFn.finish_bundle() method.
@@ -170,8 +170,8 @@ class DoFnInvoker(object):
defaults = self.signature.finish_bundle_method.defaults
args = [self.context if d == core.DoFn.ContextParam else d
for d in defaults]
- self.output_processor.process_outputs(
- None, self.signature.finish_bundle_method.method_value(*args))
+ self.output_processor.finish_bundle_outputs(
+ self.signature.finish_bundle_method.method_value(*args))
class SimpleInvoker(DoFnInvoker):
@@ -436,13 +436,14 @@ class OutputProcessor(object):
self.tagged_receivers = tagged_receivers
def process_outputs(self, windowed_input_element, results):
- """Dispatch the result of computation to the appropriate receivers.
+ """Dispatch the result of process computation to the appropriate receivers.
A value wrapped in a OutputValue object will be unwrapped and
then dispatched to the appropriate indexed output.
"""
if results is None:
return
+
for result in results:
tag = None
if isinstance(result, OutputValue):
@@ -455,19 +456,6 @@ class OutputProcessor(object):
if (windowed_input_element is not None
and len(windowed_input_element.windows) != 1):
windowed_value.windows *= len(windowed_input_element.windows)
- elif windowed_input_element is None:
- # Start and finish have no element from which to grab context,
- # but may emit elements.
- if isinstance(result, TimestampedValue):
- value = result.value
- timestamp = result.timestamp
- assign_context = NoContext(value, timestamp)
- else:
- value = result
- timestamp = -1
- assign_context = NoContext(value)
- windowed_value = WindowedValue(
- value, timestamp, self.window_fn.assign(assign_context))
elif isinstance(result, TimestampedValue):
assign_context = WindowFn.AssignContext(result.timestamp, result.value)
windowed_value = WindowedValue(
@@ -482,6 +470,50 @@ class OutputProcessor(object):
else:
self.tagged_receivers[tag].output(windowed_value)
+ def start_bundle_outputs(self, results):
+ """Validate that start_bundle does not output any elements"""
+ if results is None:
+ return
+ raise RuntimeError(
+ 'Start Bundle should not output any elements but got %s' % results)
+
+ def finish_bundle_outputs(self, results):
+ """Dispatch the result of finish_bundle to the appropriate receivers.
+
+ A value wrapped in a OutputValue object will be unwrapped and
+ then dispatched to the appropriate indexed output.
+ """
+ if results is None:
+ return
+
+ for result in results:
+ tag = None
+ if isinstance(result, OutputValue):
+ tag = result.tag
+ if not isinstance(tag, basestring):
+ raise TypeError('In %s, tag %s is not a string' % (self, tag))
+ result = result.value
+
+ if isinstance(result, WindowedValue):
+ windowed_value = result
+ elif isinstance(result, TimestampedValue):
+ value = result.value
+ timestamp = result.timestamp
+ assign_context = NoContext(value, timestamp)
+ windowed_value = WindowedValue(
+ value, timestamp, self.window_fn.assign(assign_context))
+ else:
+ value = result
+ timestamp = -1
+ assign_context = NoContext(value)
+ windowed_value = WindowedValue(
+ value, timestamp, self.window_fn.assign(assign_context))
+
+ if tag is None:
+ self.main_receivers.receive(windowed_value)
+ else:
+ self.tagged_receivers[tag].output(windowed_value)
+
class NoContext(WindowFn.AssignContext):
"""An uninspectable WindowFn.AssignContext."""
http://git-wip-us.apache.org/repos/asf/beam/blob/1360c21d/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 303dfb8..ae77227 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import
import operator
import re
import unittest
+import mock
import hamcrest as hc
from nose.plugins.attrib import attr
@@ -46,6 +47,16 @@ from apache_beam.utils.pipeline_options import TypeOptions
# Disable frequent lint warning due to pipe operator for chaining transforms.
# pylint: disable=expression-not-assigned
+class MyDoFn(beam.DoFn):
+ def start_bundle(self):
+ pass
+
+ def process(self, element):
+ pass
+
+ def finish_bundle(self):
+ yield 'finish'
+
class PTransformTest(unittest.TestCase):
# Enable nose tests running in parallel
@@ -274,17 +285,7 @@ class PTransformTest(unittest.TestCase):
expected_error_prefix = 'FlatMap and ParDo must return an iterable.'
self.assertStartswith(cm.exception.message, expected_error_prefix)
- def test_do_fn_with_start_finish(self):
- class MyDoFn(beam.DoFn):
- def start_bundle(self):
- yield 'start'
-
- def process(self, element):
- pass
-
- def finish_bundle(self):
- yield 'finish'
-
+ def test_do_fn_with_finish(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
result = pcoll | 'Do' >> beam.ParDo(MyDoFn())
@@ -292,13 +293,30 @@ class PTransformTest(unittest.TestCase):
# May have many bundles, but each has a start and finish.
def matcher():
def match(actual):
- equal_to(['start', 'finish'])(list(set(actual)))
- equal_to([actual.count('start')])([actual.count('finish')])
+ equal_to(['finish'])(list(set(actual)))
+ equal_to([1])([actual.count('finish')])
return match
assert_that(result, matcher())
pipeline.run()
+ @mock.patch.object(MyDoFn, 'start_bundle')
+ def test_do_fn_with_start(self, mock_method):
+ mock_method.return_value = None
+ pipeline = TestPipeline()
+ pipeline | 'Start' >> beam.Create([1, 2, 3]) | 'Do' >> beam.ParDo(MyDoFn())
+ pipeline.run()
+ self.assertTrue(mock_method.called)
+
+ @mock.patch.object(MyDoFn, 'start_bundle')
+ def test_do_fn_with_start_error(self, mock_method):
+ mock_method.return_value = [1]
+ pipeline = TestPipeline()
+ pipeline | 'Start' >> beam.Create([1, 2, 3]) | 'Do' >> beam.ParDo(MyDoFn())
+ with self.assertRaises(RuntimeError):
+ pipeline.run()
+ self.assertTrue(mock_method.called)
+
def test_filter(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3, 4])
[2/2] beam git commit: This closes #2715
Posted by al...@apache.org.
This closes #2715
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/26c61f41
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/26c61f41
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/26c61f41
Branch: refs/heads/master
Commit: 26c61f4143678cf6c9cb283beec99365145e2223
Parents: 3961ce4 1360c21
Author: Ahmet Altay <al...@google.com>
Authored: Wed Apr 26 18:07:07 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Apr 26 18:07:07 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 68 ++++++++++++++------
.../apache_beam/transforms/ptransform_test.py | 44 +++++++++----
2 files changed, 81 insertions(+), 31 deletions(-)
----------------------------------------------------------------------