You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/27 05:49:33 UTC
[1/2] beam git commit: Implement streaming GroupByKey in Python
DirectRunner
Repository: beam
Updated Branches:
refs/heads/master 95e6bbe50 -> 8036001da
Implement streaming GroupByKey in Python DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb379e76
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb379e76
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb379e76
Branch: refs/heads/master
Commit: eb379e76adaa9c4b4e24a4b3c5757be8523d95c4
Parents: 95e6bbe
Author: Charles Chen <cc...@google.com>
Authored: Mon Jun 26 16:54:00 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Jun 26 22:49:06 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 29 +++-
.../runners/direct/evaluation_context.py | 2 +-
.../runners/direct/transform_evaluator.py | 138 ++++++++++++++++++-
sdks/python/apache_beam/runners/direct/util.py | 25 ++--
.../runners/direct/watermark_manager.py | 26 ++--
.../apache_beam/testing/test_stream_test.py | 37 ++++-
sdks/python/apache_beam/transforms/trigger.py | 16 +++
7 files changed, 239 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index d80ef10..2a75977 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -34,6 +34,7 @@ from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PValueCache
from apache_beam.transforms.core import _GroupAlsoByWindow
+from apache_beam.transforms.core import _GroupByKeyOnly
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import RuntimeValueProvider
@@ -47,6 +48,13 @@ K = typehints.TypeVariable('K')
V = typehints.TypeVariable('V')
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class _StreamingGroupByKeyOnly(_GroupByKeyOnly):
+ """Streaming GroupByKeyOnly placeholder for overriding in DirectRunner."""
+ pass
+
+
@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
@@ -79,17 +87,24 @@ class DirectRunner(PipelineRunner):
except NotImplementedError:
return transform.expand(pcoll)
+ def apply__GroupByKeyOnly(self, transform, pcoll):
+ if (transform.__class__ == _GroupByKeyOnly and
+ pcoll.pipeline._options.view_as(StandardOptions).streaming):
+ # Use specialized streaming implementation, if requested.
+ type_hints = transform.get_type_hints()
+ return pcoll | (_StreamingGroupByKeyOnly()
+ .with_input_types(*type_hints.input_types[0])
+ .with_output_types(*type_hints.output_types[0]))
+ return transform.expand(pcoll)
+
def apply__GroupAlsoByWindow(self, transform, pcoll):
if (transform.__class__ == _GroupAlsoByWindow and
pcoll.pipeline._options.view_as(StandardOptions).streaming):
# Use specialized streaming implementation, if requested.
- raise NotImplementedError(
- 'Streaming support is not yet available on the DirectRunner.')
- # TODO(ccy): enable when streaming implementation is plumbed through.
- # type_hints = transform.get_type_hints()
- # return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing)
- # .with_input_types(*type_hints.input_types[0])
- # .with_output_types(*type_hints.output_types[0]))
+ type_hints = transform.get_type_hints()
+ return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing)
+ .with_input_types(*type_hints.input_types[0])
+ .with_output_types(*type_hints.output_types[0]))
return transform.expand(pcoll)
def run(self, pipeline):
http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 669a68a..54c407c 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -213,7 +213,7 @@ class EvaluationContext(object):
result.unprocessed_bundles)
self._watermark_manager.update_watermarks(
completed_bundle, result.transform, completed_timers,
- committed_bundles, unprocessed_bundles, result.watermark_hold)
+ committed_bundles, unprocessed_bundles, result.keyed_watermark_holds)
self._metrics.commit_logical(completed_bundle,
result.logical_metric_updates)
http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 3aefbb8..67b2492 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -27,6 +27,8 @@ from apache_beam.internal import pickler
import apache_beam.io as io
from apache_beam.runners.common import DoFnRunner
from apache_beam.runners.common import DoFnState
+from apache_beam.runners.direct.direct_runner import _StreamingGroupByKeyOnly
+from apache_beam.runners.direct.direct_runner import _StreamingGroupAlsoByWindow
from apache_beam.runners.direct.watermark_manager import WatermarkManager
from apache_beam.runners.direct.util import KeyedWorkItem
from apache_beam.runners.direct.util import TransformResult
@@ -38,6 +40,7 @@ from apache_beam.testing.test_stream import ProcessingTimeEvent
from apache_beam.transforms import core
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import WindowedValue
+from apache_beam.transforms.trigger import create_trigger_driver
from apache_beam.transforms.trigger import _CombiningValueStateTag
from apache_beam.transforms.trigger import _ListStateTag
from apache_beam.transforms.trigger import TimeDomain
@@ -63,6 +66,8 @@ class TransformEvaluatorRegistry(object):
core.Flatten: _FlattenEvaluator,
core.ParDo: _ParDoEvaluator,
core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
+ _StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator,
+ _StreamingGroupAlsoByWindow: _StreamingGroupAlsoByWindowEvaluator,
_NativeWrite: _NativeWriteEvaluator,
TestStream: _TestStreamEvaluator,
}
@@ -125,7 +130,10 @@ class TransformEvaluatorRegistry(object):
True if executor should execute applied_ptransform serially.
"""
return isinstance(applied_ptransform.transform,
- (core._GroupByKeyOnly, _NativeWrite))
+ (core._GroupByKeyOnly,
+ _StreamingGroupByKeyOnly,
+ _StreamingGroupAlsoByWindow,
+ _NativeWrite,))
class RootBundleProvider(object):
@@ -234,7 +242,7 @@ class _TransformEvaluator(object):
timer and passes it to process_element(). Evaluator subclasses which
desire different timer delivery semantics can override process_timer().
"""
- state = self.step_context.get_keyed_state(timer_firing.key)
+ state = self.step_context.get_keyed_state(timer_firing.encoded_key)
state.clear_timer(
timer_firing.window, timer_firing.name, timer_firing.time_domain)
self.process_timer(timer_firing)
@@ -242,7 +250,9 @@ class _TransformEvaluator(object):
def process_timer(self, timer_firing):
"""Default process_timer() impl. generating KeyedWorkItem element."""
self.process_element(
- KeyedWorkItem(timer_firing.key, timer_firing=timer_firing))
+ GlobalWindows.windowed_value(
+ KeyedWorkItem(timer_firing.encoded_key,
+ timer_firings=[timer_firing])))
def process_element(self, element):
"""Processes a new element as part of the current bundle."""
@@ -343,7 +353,8 @@ class _TestStreamEvaluator(_TransformEvaluator):
unprocessed_bundles.append(unprocessed_bundle)
hold = self.watermark
return TransformResult(
- self._applied_ptransform, self.bundles, unprocessed_bundles, None, hold)
+ self._applied_ptransform, self.bundles, unprocessed_bundles, None,
+ {None: hold})
class _FlattenEvaluator(_TransformEvaluator):
@@ -547,7 +558,122 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
return TransformResult(
- self._applied_ptransform, bundles, [], None, hold)
+ self._applied_ptransform, bundles, [], None, {None: hold})
+
+
+class _StreamingGroupByKeyOnlyEvaluator(_TransformEvaluator):
+ """TransformEvaluator for _StreamingGroupByKeyOnly transform.
+
+ The _GroupByKeyOnlyEvaluator buffers elements until its input watermark goes
+ to infinity, which is suitable for batch mode execution. During streaming
+ mode execution, we emit each bundle as it comes to the next transform.
+ """
+
+ MAX_ELEMENT_PER_BUNDLE = None
+
+ def __init__(self, evaluation_context, applied_ptransform,
+ input_committed_bundle, side_inputs, scoped_metrics_container):
+ assert not side_inputs
+ super(_StreamingGroupByKeyOnlyEvaluator, self).__init__(
+ evaluation_context, applied_ptransform, input_committed_bundle,
+ side_inputs, scoped_metrics_container)
+
+ def start_bundle(self):
+ self.gbk_items = collections.defaultdict(list)
+
+ assert len(self._outputs) == 1
+ self.output_pcollection = list(self._outputs)[0]
+
+ # The input type of a GroupByKey will be KV[Any, Any] or more specific.
+ kv_type_hint = (
+ self._applied_ptransform.transform.get_type_hints().input_types[0])
+ self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
+
+ def process_element(self, element):
+ if (isinstance(element, WindowedValue)
+ and isinstance(element.value, collections.Iterable)
+ and len(element.value) == 2):
+ k, v = element.value
+ self.gbk_items[self.key_coder.encode(k)].append(v)
+ else:
+ raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
+ 'windowed key-value pairs. Instead received: %r.'
+ % element)
+
+ def finish_bundle(self):
+ bundles = []
+ bundle = None
+ for encoded_k, vs in self.gbk_items.iteritems():
+ if not bundle:
+ bundle = self._evaluation_context.create_bundle(
+ self.output_pcollection)
+ bundles.append(bundle)
+ kwi = KeyedWorkItem(encoded_k, elements=vs)
+ bundle.add(GlobalWindows.windowed_value(kwi))
+
+ return TransformResult(
+ self._applied_ptransform, bundles, [], None, None)
+
+
+class _StreamingGroupAlsoByWindowEvaluator(_TransformEvaluator):
+ """TransformEvaluator for the _StreamingGroupAlsoByWindow transform.
+
+ This evaluator is only used in streaming mode. In batch mode, the
+ GroupAlsoByWindow operation is evaluated as a normal DoFn, as defined
+ in transforms/core.py.
+ """
+
+ def __init__(self, evaluation_context, applied_ptransform,
+ input_committed_bundle, side_inputs, scoped_metrics_container):
+ assert not side_inputs
+ super(_StreamingGroupAlsoByWindowEvaluator, self).__init__(
+ evaluation_context, applied_ptransform, input_committed_bundle,
+ side_inputs, scoped_metrics_container)
+
+ def start_bundle(self):
+ assert len(self._outputs) == 1
+ self.output_pcollection = list(self._outputs)[0]
+ self.step_context = self._execution_context.get_step_context()
+ self.driver = create_trigger_driver(
+ self._applied_ptransform.transform.windowing)
+ self.gabw_items = []
+ self.keyed_holds = {}
+
+ # The input type of a GroupAlsoByWindow will be KV[Any, Iter[Any]] or more
+ # specific.
+ kv_type_hint = (
+ self._applied_ptransform.transform.get_type_hints().input_types[0])
+ self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
+
+ def process_element(self, element):
+ kwi = element.value
+ assert isinstance(kwi, KeyedWorkItem), kwi
+ encoded_k, timer_firings, vs = (
+ kwi.encoded_key, kwi.timer_firings, kwi.elements)
+ k = self.key_coder.decode(encoded_k)
+ state = self.step_context.get_keyed_state(encoded_k)
+
+ for timer_firing in timer_firings:
+ for wvalue in self.driver.process_timer(
+ timer_firing.window, timer_firing.name, timer_firing.time_domain,
+ timer_firing.timestamp, state):
+ self.gabw_items.append(wvalue.with_value((k, wvalue.value)))
+ if vs:
+ for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
+ self.gabw_items.append(wvalue.with_value((k, wvalue.value)))
+
+ self.keyed_holds[encoded_k] = state.get_earliest_hold()
+
+ def finish_bundle(self):
+ bundles = []
+ if self.gabw_items:
+ bundle = self._evaluation_context.create_bundle(self.output_pcollection)
+ for item in self.gabw_items:
+ bundle.add(item)
+ bundles.append(bundle)
+
+ return TransformResult(
+ self._applied_ptransform, bundles, [], None, self.keyed_holds)
class _NativeWriteEvaluator(_TransformEvaluator):
@@ -612,4 +738,4 @@ class _NativeWriteEvaluator(_TransformEvaluator):
None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
return TransformResult(
- self._applied_ptransform, [], [], None, hold)
+ self._applied_ptransform, [], [], None, {None: hold})
http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py
index 8c846fc..10f7b29 100644
--- a/sdks/python/apache_beam/runners/direct/util.py
+++ b/sdks/python/apache_beam/runners/direct/util.py
@@ -27,13 +27,21 @@ class TransformResult(object):
"""Result of evaluating an AppliedPTransform with a TransformEvaluator."""
def __init__(self, applied_ptransform, uncommitted_output_bundles,
- unprocessed_bundles, counters, watermark_hold,
+ unprocessed_bundles, counters, keyed_watermark_holds,
undeclared_tag_values=None):
self.transform = applied_ptransform
self.uncommitted_output_bundles = uncommitted_output_bundles
self.unprocessed_bundles = unprocessed_bundles
self.counters = counters
- self.watermark_hold = watermark_hold
+ # Mapping of key -> earliest hold timestamp or None. Keys should be
+ # strings or None.
+ #
+ # For each key, we receive as its corresponding value the earliest
+ # watermark hold for that key (the key can be None for global state), past
+ # which the output watermark for the currently-executing step will not
+ # advance. If the value is None or utils.timestamp.MAX_TIMESTAMP, the
+ # watermark hold will be removed.
+ self.keyed_watermark_holds = keyed_watermark_holds or {}
# Only used when caching (materializing) all values is requested.
self.undeclared_tag_values = undeclared_tag_values
# Populated by the TransformExecutor.
@@ -43,8 +51,8 @@ class TransformResult(object):
class TimerFiring(object):
"""A single instance of a fired timer."""
- def __init__(self, key, window, name, time_domain, timestamp):
- self.key = key
+ def __init__(self, encoded_key, window, name, time_domain, timestamp):
+ self.encoded_key = encoded_key
self.window = window
self.name = name
self.time_domain = time_domain
@@ -53,8 +61,7 @@ class TimerFiring(object):
class KeyedWorkItem(object):
"""A keyed item that can either be a timer firing or a list of elements."""
- def __init__(self, key, timer_firing=None, elements=None):
- self.key = key
- assert not timer_firing and elements
- self.timer_firing = timer_firing
- self.elements = elements
+ def __init__(self, encoded_key, timer_firings=None, elements=None):
+ self.encoded_key = encoded_key
+ self.timer_firings = timer_firings or []
+ self.elements = elements or []
http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 4aa2bb4..935998d 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -94,13 +94,13 @@ class WatermarkManager(object):
def update_watermarks(self, completed_committed_bundle, applied_ptransform,
completed_timers, outputs, unprocessed_bundles,
- earliest_hold):
+ keyed_earliest_holds):
assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
self._update_pending(
completed_committed_bundle, applied_ptransform, completed_timers,
outputs, unprocessed_bundles)
tw = self.get_watermarks(applied_ptransform)
- tw.hold(earliest_hold)
+ tw.hold(keyed_earliest_holds)
self._refresh_watermarks(applied_ptransform)
def _update_pending(self, input_committed_bundle, applied_ptransform,
@@ -161,7 +161,7 @@ class _TransformWatermarks(object):
self._input_transform_watermarks = []
self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
- self._earliest_hold = WatermarkManager.WATERMARK_POS_INF
+ self._keyed_earliest_holds = {}
self._pending = set() # Scheduled bundles targeted for this transform.
self._fired_timers = set()
self._lock = threading.Lock()
@@ -187,11 +187,13 @@ class _TransformWatermarks(object):
with self._lock:
return self._output_watermark
- def hold(self, value):
+ def hold(self, keyed_earliest_holds):
with self._lock:
- if value is None:
- value = WatermarkManager.WATERMARK_POS_INF
- self._earliest_hold = value
+ for key, hold_value in keyed_earliest_holds.iteritems():
+ self._keyed_earliest_holds[key] = hold_value
+ if (hold_value is None or
+ hold_value == WatermarkManager.WATERMARK_POS_INF):
+ del self._keyed_earliest_holds[key]
def add_pending(self, pending):
with self._lock:
@@ -230,7 +232,11 @@ class _TransformWatermarks(object):
self._input_watermark = max(self._input_watermark,
min(pending_holder, producer_watermark))
- new_output_watermark = min(self._input_watermark, self._earliest_hold)
+ earliest_hold = WatermarkManager.WATERMARK_POS_INF
+ for hold in self._keyed_earliest_holds.values():
+ if hold < earliest_hold:
+ earliest_hold = hold
+ new_output_watermark = min(self._input_watermark, earliest_hold)
advanced = new_output_watermark > self._output_watermark
self._output_watermark = new_output_watermark
@@ -246,11 +252,11 @@ class _TransformWatermarks(object):
return False
fired_timers = []
- for key, state in self._keyed_states.iteritems():
+ for encoded_key, state in self._keyed_states.iteritems():
timers = state.get_timers(watermark=self._input_watermark)
for expired in timers:
window, (name, time_domain, timestamp) = expired
fired_timers.append(
- TimerFiring(key, window, name, time_domain, timestamp))
+ TimerFiring(encoded_key, window, name, time_domain, timestamp))
self._fired_timers.update(fired_timers)
return fired_timers
http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/testing/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index 071c7cd..b7ca141 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -20,12 +20,15 @@
import unittest
import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import ElementEvent
from apache_beam.testing.test_stream import ProcessingTimeEvent
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.test_stream import WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import timestamp
from apache_beam.utils.windowed_value import WindowedValue
@@ -98,7 +101,9 @@ class TestStreamTest(unittest.TestCase):
timestamp=beam.DoFn.TimestampParam):
yield (element, timestamp)
- p = TestPipeline()
+ options = PipelineOptions()
+ options.view_as(StandardOptions).streaming = True
+ p = TestPipeline(options=options)
my_record_fn = RecordFn()
records = p | test_stream | beam.ParDo(my_record_fn)
assert_that(records, equal_to([
@@ -111,6 +116,36 @@ class TestStreamTest(unittest.TestCase):
('last', timestamp.Timestamp(310)),]))
p.run()
+ def test_gbk_execution(self):
+ test_stream = (TestStream()
+ .advance_watermark_to(10)
+ .add_elements(['a', 'b', 'c'])
+ .advance_watermark_to(20)
+ .add_elements(['d'])
+ .add_elements(['e'])
+ .advance_processing_time(10)
+ .advance_watermark_to(300)
+ .add_elements([TimestampedValue('late', 12)])
+ .add_elements([TimestampedValue('last', 310)]))
+
+ options = PipelineOptions()
+ options.view_as(StandardOptions).streaming = True
+ p = TestPipeline(options=options)
+ records = (p
+ | test_stream
+ | beam.WindowInto(FixedWindows(15))
+ | beam.Map(lambda x: ('k', x))
+ | beam.GroupByKey())
+ # TODO(BEAM-2519): timestamp assignment for elements from a GBK should
+ # respect the TimestampCombiner. The test below should also verify the
+ # timestamps of the outputted elements once this is implemented.
+ assert_that(records, equal_to([
+ ('k', ['a', 'b', 'c']),
+ ('k', ['d', 'e']),
+ ('k', ['late']),
+ ('k', ['last'])]))
+ p.run()
+
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 7ff44fa..f77fa1a 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -36,6 +36,7 @@ from apache_beam.transforms.window import WindowFn
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import TIME_GRANULARITY
# AfterCount is experimental. No backwards compatibility guarantees.
@@ -1066,6 +1067,8 @@ class InMemoryUnmergedState(UnmergedState):
def clear_timer(self, window, name, time_domain):
self.timers[window].pop((name, time_domain), None)
+ if not self.timers[window]:
+ del self.timers[window]
def get_window(self, window_id):
return window_id
@@ -1117,6 +1120,19 @@ class InMemoryUnmergedState(UnmergedState):
def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
return self.get_timers(clear=True, watermark=watermark)
+ def get_earliest_hold(self):
+ earliest_hold = MAX_TIMESTAMP
+ for unused_window, tagged_states in self.state.iteritems():
+ # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is
+ # named "watermark". This is currently only true because the only place
+ # watermark holds are set is in the GeneralTriggerDriver, where we use
+ # this name. We should fix this by allowing enumeration of the tag types
+ # used in adding state.
+ if 'watermark' in tagged_states and tagged_states['watermark']:
+ hold = min(tagged_states['watermark']) - TIME_GRANULARITY
+ earliest_hold = min(earliest_hold, hold)
+ return earliest_hold
+
def __repr__(self):
state_str = '\n'.join('%s: %s' % (key, dict(state))
for key, state in self.state.items())
[2/2] beam git commit: This closes #3444
Posted by al...@apache.org.
This closes #3444
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8036001d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8036001d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8036001d
Branch: refs/heads/master
Commit: 8036001da6f90eac20d787a05d35a51e30146278
Parents: 95e6bbe eb379e7
Author: Ahmet Altay <al...@google.com>
Authored: Mon Jun 26 22:49:19 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Jun 26 22:49:19 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 29 +++-
.../runners/direct/evaluation_context.py | 2 +-
.../runners/direct/transform_evaluator.py | 138 ++++++++++++++++++-
sdks/python/apache_beam/runners/direct/util.py | 25 ++--
.../runners/direct/watermark_manager.py | 26 ++--
.../apache_beam/testing/test_stream_test.py | 37 ++++-
sdks/python/apache_beam/transforms/trigger.py | 16 +++
7 files changed, 239 insertions(+), 34 deletions(-)
----------------------------------------------------------------------