You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/21 20:44:16 UTC
[1/2] beam git commit: Allow production of unprocessed bundles,
introduce TestStream evaluator in DirectRunner
Repository: beam
Updated Branches:
refs/heads/master 28c6fd42e -> f0467b72f
Allow production of unprocessed bundles, introduce TestStream evaluator in DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3520f948
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3520f948
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3520f948
Branch: refs/heads/master
Commit: 3520f94882b00aa8db64f6379044689d1b78ac06
Parents: 28c6fd4
Author: Charles Chen <cc...@google.com>
Authored: Tue Jun 20 17:16:20 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 21 13:44:05 2017 -0700
----------------------------------------------------------------------
.../runners/direct/evaluation_context.py | 14 ++--
.../apache_beam/runners/direct/executor.py | 40 +++++++--
.../runners/direct/transform_evaluator.py | 88 ++++++++++++++++++--
sdks/python/apache_beam/runners/direct/util.py | 4 +-
.../runners/direct/watermark_manager.py | 11 ++-
sdks/python/apache_beam/testing/test_stream.py | 5 ++
.../apache_beam/testing/test_stream_test.py | 37 ++++++++
7 files changed, 176 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 976e9e8..669a68a 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -208,11 +208,12 @@ class EvaluationContext(object):
the committed bundles contained within the handled result.
"""
with self._lock:
- committed_bundles = self._commit_bundles(
- result.uncommitted_output_bundles)
+ committed_bundles, unprocessed_bundles = self._commit_bundles(
+ result.uncommitted_output_bundles,
+ result.unprocessed_bundles)
self._watermark_manager.update_watermarks(
completed_bundle, result.transform, completed_timers,
- committed_bundles, result.watermark_hold)
+ committed_bundles, unprocessed_bundles, result.watermark_hold)
self._metrics.commit_logical(completed_bundle,
result.logical_metric_updates)
@@ -252,14 +253,17 @@ class EvaluationContext(object):
executor_service.submit(task)
self._pending_unblocked_tasks = []
- def _commit_bundles(self, uncommitted_bundles):
+ def _commit_bundles(self, uncommitted_bundles, unprocessed_bundles):
"""Commits bundles and returns a immutable set of committed bundles."""
for in_progress_bundle in uncommitted_bundles:
producing_applied_ptransform = in_progress_bundle.pcollection.producer
watermarks = self._watermark_manager.get_watermarks(
producing_applied_ptransform)
in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
- return tuple(uncommitted_bundles)
+
+ for unprocessed_bundle in unprocessed_bundles:
+ unprocessed_bundle.commit(None)
+ return tuple(uncommitted_bundles), tuple(unprocessed_bundles)
def get_execution_context(self, applied_ptransform):
return _ExecutionContext(
http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index a0a3886..e70e326 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -227,17 +227,25 @@ class _CompletionCallback(object):
self._all_updates = all_updates
self._timer_firings = timer_firings or []
- def handle_result(self, input_committed_bundle, transform_result):
+ def handle_result(self, transform_executor, input_committed_bundle,
+ transform_result):
output_committed_bundles = self._evaluation_context.handle_result(
input_committed_bundle, self._timer_firings, transform_result)
for output_committed_bundle in output_committed_bundles:
self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate(
- output_committed_bundle, None))
+ transform_executor,
+ committed_bundle=output_committed_bundle))
+ for unprocessed_bundle in transform_result.unprocessed_bundles:
+ self._all_updates.offer(
+ _ExecutorServiceParallelExecutor._ExecutorUpdate(
+ transform_executor,
+ unprocessed_bundle=unprocessed_bundle))
return output_committed_bundles
- def handle_exception(self, exception):
+ def handle_exception(self, transform_executor, exception):
self._all_updates.offer(
- _ExecutorServiceParallelExecutor._ExecutorUpdate(None, exception))
+ _ExecutorServiceParallelExecutor._ExecutorUpdate(
+ transform_executor, exception=exception))
class TransformExecutor(_ExecutorService.CallableTask):
@@ -312,10 +320,10 @@ class TransformExecutor(_ExecutorService.CallableTask):
self._evaluation_context.append_to_cache(
self._applied_ptransform, tag, value)
- self._completion_callback.handle_result(self._input_bundle, result)
+ self._completion_callback.handle_result(self, self._input_bundle, result)
return result
except Exception as e: # pylint: disable=broad-except
- self._completion_callback.handle_exception(e)
+ self._completion_callback.handle_exception(self, e)
finally:
self._evaluation_context.metrics().commit_physical(
self._input_bundle,
@@ -387,6 +395,10 @@ class _ExecutorServiceParallelExecutor(object):
self.schedule_consumption(applied_ptransform, committed_bundle, [],
self.default_completion_callback)
+ def schedule_unprocessed_bundle(self, applied_ptransform,
+ unprocessed_bundle):
+ self.node_to_pending_bundles[applied_ptransform].append(unprocessed_bundle)
+
def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
fired_timers, on_complete):
"""Schedules evaluation of the given bundle with the transform."""
@@ -433,10 +445,16 @@ class _ExecutorServiceParallelExecutor(object):
class _ExecutorUpdate(object):
"""An internal status update on the state of the executor."""
- def __init__(self, produced_bundle=None, exception=None):
+ def __init__(self, transform_executor, committed_bundle=None,
+ unprocessed_bundle=None, exception=None):
+ self.transform_executor = transform_executor
# Exactly one of them should be not-None
- assert bool(produced_bundle) != bool(exception)
- self.committed_bundle = produced_bundle
+ assert sum([
+ bool(committed_bundle),
+ bool(unprocessed_bundle),
+ bool(exception)]) == 1
+ self.committed_bundle = committed_bundle
+ self.unprocessed_bundle = unprocessed_bundle
self.exception = exception
self.exc_info = sys.exc_info()
if self.exc_info[1] is not exception:
@@ -471,6 +489,10 @@ class _ExecutorServiceParallelExecutor(object):
while update:
if update.committed_bundle:
self._executor.schedule_consumers(update.committed_bundle)
+ elif update.unprocessed_bundle:
+ self._executor.schedule_unprocessed_bundle(
+ update.transform_executor._applied_ptransform,
+ update.unprocessed_bundle)
else:
assert update.exception
logging.warning('A task failed with exception.\n %s',
http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index e92d799..3aefbb8 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -31,6 +31,10 @@ from apache_beam.runners.direct.watermark_manager import WatermarkManager
from apache_beam.runners.direct.util import KeyedWorkItem
from apache_beam.runners.direct.util import TransformResult
from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.test_stream import ElementEvent
+from apache_beam.testing.test_stream import WatermarkEvent
+from apache_beam.testing.test_stream import ProcessingTimeEvent
from apache_beam.transforms import core
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import WindowedValue
@@ -41,6 +45,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
from apache_beam.typehints.typecheck import TypeCheckError
from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
from apache_beam.utils import counters
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.options.pipeline_options import TypeOptions
@@ -59,9 +64,11 @@ class TransformEvaluatorRegistry(object):
core.ParDo: _ParDoEvaluator,
core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
_NativeWrite: _NativeWriteEvaluator,
+ TestStream: _TestStreamEvaluator,
}
self._root_bundle_providers = {
core.PTransform: DefaultRootBundleProvider,
+ TestStream: _TestStreamRootBundleProvider,
}
def get_evaluator(
@@ -142,6 +149,23 @@ class DefaultRootBundleProvider(RootBundleProvider):
return [empty_bundle]
+class _TestStreamRootBundleProvider(RootBundleProvider):
+ """Provides an initial bundle for the TestStream evaluator."""
+
+ def get_root_bundles(self):
+ test_stream = self._applied_ptransform.transform
+ bundles = []
+ if len(test_stream.events) > 0:
+ bundle = self._evaluation_context.create_bundle(
+ pvalue.PBegin(self._applied_ptransform.transform.pipeline))
+ # Explicitly set timestamp to MIN_TIMESTAMP to ensure that we hold the
+ # watermark.
+ bundle.add(GlobalWindows.windowed_value(0, timestamp=MIN_TIMESTAMP))
+ bundle.commit(None)
+ bundles.append(bundle)
+ return bundles
+
+
class _TransformEvaluator(object):
"""An evaluator of a specific application of a transform."""
@@ -265,7 +289,61 @@ class _BoundedReadEvaluator(_TransformEvaluator):
bundles = _read_values_to_bundles(reader)
return TransformResult(
- self._applied_ptransform, bundles, None, None)
+ self._applied_ptransform, bundles, [], None, None)
+
+
+class _TestStreamEvaluator(_TransformEvaluator):
+ """TransformEvaluator for the TestStream transform."""
+
+ def __init__(self, evaluation_context, applied_ptransform,
+ input_committed_bundle, side_inputs, scoped_metrics_container):
+ assert not side_inputs
+ self.test_stream = applied_ptransform.transform
+ super(_TestStreamEvaluator, self).__init__(
+ evaluation_context, applied_ptransform, input_committed_bundle,
+ side_inputs, scoped_metrics_container)
+
+ def start_bundle(self):
+ self.current_index = -1
+ self.watermark = MIN_TIMESTAMP
+ self.bundles = []
+
+ def process_element(self, element):
+ index = element.value
+ self.watermark = element.timestamp
+ assert isinstance(index, int)
+ assert 0 <= index <= len(self.test_stream.events)
+ self.current_index = index
+ event = self.test_stream.events[self.current_index]
+ if isinstance(event, ElementEvent):
+ assert len(self._outputs) == 1
+ output_pcollection = list(self._outputs)[0]
+ bundle = self._evaluation_context.create_bundle(output_pcollection)
+ for tv in event.timestamped_values:
+ bundle.output(
+ GlobalWindows.windowed_value(tv.value, timestamp=tv.timestamp))
+ self.bundles.append(bundle)
+ elif isinstance(event, WatermarkEvent):
+ assert event.new_watermark >= self.watermark
+ self.watermark = event.new_watermark
+ elif isinstance(event, ProcessingTimeEvent):
+ # TODO(ccy): advance processing time in the context's mock clock.
+ pass
+ else:
+ raise ValueError('Invalid TestStream event: %s.' % event)
+
+ def finish_bundle(self):
+ unprocessed_bundles = []
+ hold = None
+ if self.current_index < len(self.test_stream.events) - 1:
+ unprocessed_bundle = self._evaluation_context.create_bundle(
+ pvalue.PBegin(self._applied_ptransform.transform.pipeline))
+ unprocessed_bundle.add(GlobalWindows.windowed_value(
+ self.current_index + 1, timestamp=self.watermark))
+ unprocessed_bundles.append(unprocessed_bundle)
+ hold = self.watermark
+ return TransformResult(
+ self._applied_ptransform, self.bundles, unprocessed_bundles, None, hold)
class _FlattenEvaluator(_TransformEvaluator):
@@ -289,7 +367,7 @@ class _FlattenEvaluator(_TransformEvaluator):
def finish_bundle(self):
bundles = [self.bundle]
return TransformResult(
- self._applied_ptransform, bundles, None, None)
+ self._applied_ptransform, bundles, [], None, None)
class _TaggedReceivers(dict):
@@ -378,7 +456,7 @@ class _ParDoEvaluator(_TransformEvaluator):
bundles = self._tagged_receivers.values()
result_counters = self._counter_factory.get_counters()
return TransformResult(
- self._applied_ptransform, bundles, result_counters, None,
+ self._applied_ptransform, bundles, [], result_counters, None,
self._tagged_receivers.undeclared_in_memory_tag_values)
@@ -469,7 +547,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
return TransformResult(
- self._applied_ptransform, bundles, None, hold)
+ self._applied_ptransform, bundles, [], None, hold)
class _NativeWriteEvaluator(_TransformEvaluator):
@@ -534,4 +612,4 @@ class _NativeWriteEvaluator(_TransformEvaluator):
None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
return TransformResult(
- self._applied_ptransform, [], None, hold)
+ self._applied_ptransform, [], [], None, hold)
http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py
index daaaceb..8c846fc 100644
--- a/sdks/python/apache_beam/runners/direct/util.py
+++ b/sdks/python/apache_beam/runners/direct/util.py
@@ -27,9 +27,11 @@ class TransformResult(object):
"""Result of evaluating an AppliedPTransform with a TransformEvaluator."""
def __init__(self, applied_ptransform, uncommitted_output_bundles,
- counters, watermark_hold, undeclared_tag_values=None):
+ unprocessed_bundles, counters, watermark_hold,
+ undeclared_tag_values=None):
self.transform = applied_ptransform
self.uncommitted_output_bundles = uncommitted_output_bundles
+ self.unprocessed_bundles = unprocessed_bundles
self.counters = counters
self.watermark_hold = watermark_hold
# Only used when caching (materializing) all values is requested.
http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 10d25d7..2146bb5 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -93,17 +93,19 @@ class WatermarkManager(object):
return self._transform_to_watermarks[applied_ptransform]
def update_watermarks(self, completed_committed_bundle, applied_ptransform,
- completed_timers, outputs, earliest_hold):
+ completed_timers, outputs, unprocessed_bundles,
+ earliest_hold):
assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
self._update_pending(
completed_committed_bundle, applied_ptransform, completed_timers,
- outputs)
+ outputs, unprocessed_bundles)
tw = self.get_watermarks(applied_ptransform)
tw.hold(earliest_hold)
self._refresh_watermarks(applied_ptransform)
def _update_pending(self, input_committed_bundle, applied_ptransform,
- completed_timers, output_committed_bundles):
+ completed_timers, output_committed_bundles,
+ unprocessed_bundles):
"""Updated list of pending bundles for the given AppliedPTransform."""
# Update pending elements. Filter out empty bundles. They do not impact
@@ -119,6 +121,9 @@ class WatermarkManager(object):
completed_tw = self._transform_to_watermarks[applied_ptransform]
completed_tw.update_timers(completed_timers)
+ for unprocessed_bundle in unprocessed_bundles:
+ completed_tw.add_pending(unprocessed_bundle)
+
assert input_committed_bundle or applied_ptransform in self._root_transforms
if input_committed_bundle and input_committed_bundle.has_elements():
completed_tw.remove_pending(input_committed_bundle)
http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/testing/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index a06bcd0..7989fb2 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -24,8 +24,10 @@ from abc import ABCMeta
from abc import abstractmethod
from apache_beam import coders
+from apache_beam import core
from apache_beam import pvalue
from apache_beam.transforms import PTransform
+from apache_beam.transforms import window
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import timestamp
from apache_beam.utils.windowed_value import WindowedValue
@@ -99,6 +101,9 @@ class TestStream(PTransform):
self.current_watermark = timestamp.MIN_TIMESTAMP
self.events = []
+ def get_windowing(self, unused_inputs):
+ return core.Windowing(window.GlobalWindows())
+
def expand(self, pbegin):
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/testing/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index e32dda2..bf05ac1 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -19,6 +19,8 @@
import unittest
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import ElementEvent
from apache_beam.testing.test_stream import ProcessingTimeEvent
from apache_beam.testing.test_stream import TestStream
@@ -78,6 +80,41 @@ class TestStreamTest(unittest.TestCase):
TimestampedValue('a', timestamp.MAX_TIMESTAMP)
]))
+ def test_basic_execution(self):
+ test_stream = (TestStream()
+ .advance_watermark_to(10)
+ .add_elements(['a', 'b', 'c'])
+ .advance_watermark_to(20)
+ .add_elements(['d'])
+ .add_elements(['e'])
+ .advance_processing_time(10)
+ .advance_watermark_to(300)
+ .add_elements([TimestampedValue('late', 12)])
+ .add_elements([TimestampedValue('last', 310)]))
+
+ global _seen_elements # pylint: disable=global-variable-undefined
+ _seen_elements = []
+
+ class RecordFn(beam.DoFn):
+ def process(self, element=beam.DoFn.ElementParam,
+ timestamp=beam.DoFn.TimestampParam):
+ _seen_elements.append((element, timestamp))
+
+ p = TestPipeline()
+ my_record_fn = RecordFn()
+ p | test_stream | beam.ParDo(my_record_fn) # pylint: disable=expression-not-assigned
+ p.run()
+
+ self.assertEqual([
+ ('a', timestamp.Timestamp(10)),
+ ('b', timestamp.Timestamp(10)),
+ ('c', timestamp.Timestamp(10)),
+ ('d', timestamp.Timestamp(20)),
+ ('e', timestamp.Timestamp(20)),
+ ('late', timestamp.Timestamp(12)),
+ ('last', timestamp.Timestamp(310)),], _seen_elements)
+ del _seen_elements
+
if __name__ == '__main__':
unittest.main()
[2/2] beam git commit: This closes #3409
Posted by al...@apache.org.
This closes #3409
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f0467b72
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f0467b72
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f0467b72
Branch: refs/heads/master
Commit: f0467b72fd8dfbffaaeb353abfaa1fefa1ee0092
Parents: 28c6fd4 3520f94
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jun 21 13:44:08 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 21 13:44:08 2017 -0700
----------------------------------------------------------------------
.../runners/direct/evaluation_context.py | 14 ++--
.../apache_beam/runners/direct/executor.py | 40 +++++++--
.../runners/direct/transform_evaluator.py | 88 ++++++++++++++++++--
sdks/python/apache_beam/runners/direct/util.py | 4 +-
.../runners/direct/watermark_manager.py | 11 ++-
sdks/python/apache_beam/testing/test_stream.py | 5 ++
.../apache_beam/testing/test_stream_test.py | 37 ++++++++
7 files changed, 176 insertions(+), 23 deletions(-)
----------------------------------------------------------------------