You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:04:31 UTC
[05/50] beam git commit: Use state / timer API for DirectRunner timer
firings
Use state / timer API for DirectRunner timer firings
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56041b78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56041b78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56041b78
Branch: refs/heads/gearpump-runner
Commit: 56041b7850abfbb10d4a6ff2ddecb227a0a4e7c8
Parents: 50acc6c
Author: Charles Chen <cc...@google.com>
Authored: Tue Jun 20 15:22:58 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 21 09:23:13 2017 -0700
----------------------------------------------------------------------
.../runners/direct/evaluation_context.py | 3 +-
.../apache_beam/runners/direct/executor.py | 37 ++++++++-----
.../runners/direct/transform_evaluator.py | 48 +++++++++++++---
.../runners/direct/transform_result.py | 40 --------------
sdks/python/apache_beam/runners/direct/util.py | 58 ++++++++++++++++++++
.../runners/direct/watermark_manager.py | 56 +++++++++++--------
sdks/python/apache_beam/transforms/trigger.py | 10 +++-
7 files changed, 163 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 8fa8e06..976e9e8 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -148,7 +148,8 @@ class EvaluationContext(object):
self._transform_keyed_states = self._initialize_keyed_states(
root_transforms, value_to_consumers)
self._watermark_manager = WatermarkManager(
- Clock(), root_transforms, value_to_consumers)
+ Clock(), root_transforms, value_to_consumers,
+ self._transform_keyed_states)
self._side_inputs_container = _SideInputsContainer(views)
self._pending_unblocked_tasks = []
self._counter_factory = counters.CounterFactory()
http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index eff2d3c..a0a3886 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -222,14 +222,14 @@ class _CompletionCallback(object):
or for a source transform.
"""
- def __init__(self, evaluation_context, all_updates, timers=None):
+ def __init__(self, evaluation_context, all_updates, timer_firings=None):
self._evaluation_context = evaluation_context
self._all_updates = all_updates
- self._timers = timers
+ self._timer_firings = timer_firings or []
def handle_result(self, input_committed_bundle, transform_result):
output_committed_bundles = self._evaluation_context.handle_result(
- input_committed_bundle, self._timers, transform_result)
+ input_committed_bundle, self._timer_firings, transform_result)
for output_committed_bundle in output_committed_bundles:
self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate(
output_committed_bundle, None))
@@ -251,11 +251,12 @@ class TransformExecutor(_ExecutorService.CallableTask):
"""
def __init__(self, transform_evaluator_registry, evaluation_context,
- input_bundle, applied_ptransform, completion_callback,
- transform_evaluation_state):
+ input_bundle, fired_timers, applied_ptransform,
+ completion_callback, transform_evaluation_state):
self._transform_evaluator_registry = transform_evaluator_registry
self._evaluation_context = evaluation_context
self._input_bundle = input_bundle
+ self._fired_timers = fired_timers
self._applied_ptransform = applied_ptransform
self._completion_callback = completion_callback
self._transform_evaluation_state = transform_evaluation_state
@@ -288,6 +289,10 @@ class TransformExecutor(_ExecutorService.CallableTask):
self._applied_ptransform, self._input_bundle,
side_input_values, scoped_metrics_container)
+ if self._fired_timers:
+ for timer_firing in self._fired_timers:
+ evaluator.process_timer_wrapper(timer_firing)
+
if self._input_bundle:
for value in self._input_bundle.get_elements_iterable():
evaluator.process_element(value)
@@ -379,11 +384,11 @@ class _ExecutorServiceParallelExecutor(object):
if committed_bundle.pcollection in self.value_to_consumers:
consumers = self.value_to_consumers[committed_bundle.pcollection]
for applied_ptransform in consumers:
- self.schedule_consumption(applied_ptransform, committed_bundle,
+ self.schedule_consumption(applied_ptransform, committed_bundle, [],
self.default_completion_callback)
def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
- on_complete):
+ fired_timers, on_complete):
"""Schedules evaluation of the given bundle with the transform."""
assert consumer_applied_ptransform
assert committed_bundle
@@ -397,8 +402,8 @@ class _ExecutorServiceParallelExecutor(object):
transform_executor = TransformExecutor(
self.transform_evaluator_registry, self.evaluation_context,
- committed_bundle, consumer_applied_ptransform, on_complete,
- transform_executor_service)
+ committed_bundle, fired_timers, consumer_applied_ptransform,
+ on_complete, transform_executor_service)
transform_executor_service.schedule(transform_executor)
class _TypedUpdateQueue(object):
@@ -527,19 +532,21 @@ class _ExecutorServiceParallelExecutor(object):
Returns:
True if timers fired.
"""
- fired_timers = self._executor.evaluation_context.extract_fired_timers()
- for applied_ptransform in fired_timers:
+ transform_fired_timers = (
+ self._executor.evaluation_context.extract_fired_timers())
+ for applied_ptransform, fired_timers in transform_fired_timers:
# Use an empty committed bundle. just to trigger.
empty_bundle = (
self._executor.evaluation_context.create_empty_committed_bundle(
applied_ptransform.inputs[0]))
timer_completion_callback = _CompletionCallback(
self._executor.evaluation_context, self._executor.all_updates,
- applied_ptransform)
+ timer_firings=fired_timers)
self._executor.schedule_consumption(
- applied_ptransform, empty_bundle, timer_completion_callback)
- return bool(fired_timers)
+ applied_ptransform, empty_bundle, fired_timers,
+ timer_completion_callback)
+ return bool(transform_fired_timers)
def _is_executing(self):
"""Returns True if there is at least one non-blocked TransformExecutor."""
@@ -582,6 +589,6 @@ class _ExecutorServiceParallelExecutor(object):
applied_ptransform, [])
for bundle in pending_bundles:
self._executor.schedule_consumption(
- applied_ptransform, bundle,
+ applied_ptransform, bundle, [],
self._executor.default_completion_callback)
self._executor.node_to_pending_bundles[applied_ptransform] = []
http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6e73561..e92d799 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -28,13 +28,15 @@ import apache_beam.io as io
from apache_beam.runners.common import DoFnRunner
from apache_beam.runners.common import DoFnState
from apache_beam.runners.direct.watermark_manager import WatermarkManager
-from apache_beam.runners.direct.transform_result import TransformResult
+from apache_beam.runners.direct.util import KeyedWorkItem
+from apache_beam.runners.direct.util import TransformResult
from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access
from apache_beam.transforms import core
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.trigger import _CombiningValueStateTag
from apache_beam.transforms.trigger import _ListStateTag
+from apache_beam.transforms.trigger import TimeDomain
from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
from apache_beam.typehints.typecheck import TypeCheckError
from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
@@ -199,6 +201,25 @@ class _TransformEvaluator(object):
"""Starts a new bundle."""
pass
+ def process_timer_wrapper(self, timer_firing):
+ """Process timer by clearing and then calling process_timer().
+
+ This method is called with any timer firing and clears the delivered
+ timer from the keyed state and then calls process_timer(). The default
+ process_timer() implementation emits a KeyedWorkItem for the particular
+ timer and passes it to process_element(). Evaluator subclasses which
+ desire different timer delivery semantics can override process_timer().
+ """
+ state = self.step_context.get_keyed_state(timer_firing.key)
+ state.clear_timer(
+ timer_firing.window, timer_firing.name, timer_firing.time_domain)
+ self.process_timer(timer_firing)
+
+ def process_timer(self, timer_firing):
+ """Default process_timer() impl. generating KeyedWorkItem element."""
+ self.process_element(
+ KeyedWorkItem(timer_firing.key, timer_firing=timer_firing))
+
def process_element(self, element):
"""Processes a new element as part of the current bundle."""
raise NotImplementedError('%s do not process elements.', type(self))
@@ -244,7 +265,7 @@ class _BoundedReadEvaluator(_TransformEvaluator):
bundles = _read_values_to_bundles(reader)
return TransformResult(
- self._applied_ptransform, bundles, None, None, None)
+ self._applied_ptransform, bundles, None, None)
class _FlattenEvaluator(_TransformEvaluator):
@@ -268,7 +289,7 @@ class _FlattenEvaluator(_TransformEvaluator):
def finish_bundle(self):
bundles = [self.bundle]
return TransformResult(
- self._applied_ptransform, bundles, None, None, None)
+ self._applied_ptransform, bundles, None, None)
class _TaggedReceivers(dict):
@@ -357,7 +378,7 @@ class _ParDoEvaluator(_TransformEvaluator):
bundles = self._tagged_receivers.values()
result_counters = self._counter_factory.get_counters()
return TransformResult(
- self._applied_ptransform, bundles, None, result_counters, None,
+ self._applied_ptransform, bundles, result_counters, None,
self._tagged_receivers.undeclared_in_memory_tag_values)
@@ -375,7 +396,6 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
evaluation_context, applied_ptransform, input_committed_bundle,
side_inputs, scoped_metrics_container)
- @property
def _is_final_bundle(self):
return (self._execution_context.watermarks.input_watermark
== WatermarkManager.WATERMARK_POS_INF)
@@ -392,6 +412,10 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
self._applied_ptransform.transform.get_type_hints().input_types[0])
self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
+ def process_timer(self, timer_firing):
+ # We do not need to emit a KeyedWorkItem to process_element().
+ pass
+
def process_element(self, element):
assert not self.global_state.get_state(
None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG)
@@ -408,7 +432,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
% element)
def finish_bundle(self):
- if self._is_final_bundle:
+ if self._is_final_bundle():
if self.global_state.get_state(
None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG):
# Ignore empty bundles after emitting output. (This may happen because
@@ -441,9 +465,11 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
else:
bundles = []
hold = WatermarkManager.WATERMARK_NEG_INF
+ self.global_state.set_timer(
+ None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
return TransformResult(
- self._applied_ptransform, bundles, None, None, hold)
+ self._applied_ptransform, bundles, None, hold)
class _NativeWriteEvaluator(_TransformEvaluator):
@@ -475,6 +501,10 @@ class _NativeWriteEvaluator(_TransformEvaluator):
self.step_context = self._execution_context.get_step_context()
self.global_state = self.step_context.get_keyed_state(None)
+ def process_timer(self, timer_firing):
+ # We do not need to emit a KeyedWorkItem to process_element().
+ pass
+
def process_element(self, element):
self.global_state.add_state(
None, _NativeWriteEvaluator.ELEMENTS_TAG, element)
@@ -500,6 +530,8 @@ class _NativeWriteEvaluator(_TransformEvaluator):
hold = WatermarkManager.WATERMARK_POS_INF
else:
hold = WatermarkManager.WATERMARK_NEG_INF
+ self.global_state.set_timer(
+ None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
return TransformResult(
- self._applied_ptransform, [], None, None, hold)
+ self._applied_ptransform, [], None, hold)
http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py
deleted file mode 100644
index 51593e3..0000000
--- a/sdks/python/apache_beam/runners/direct/transform_result.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
-from __future__ import absolute_import
-
-
-class TransformResult(object):
- """For internal use only; no backwards-compatibility guarantees.
-
- The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
- def __init__(self, applied_ptransform, uncommitted_output_bundles,
- timer_update, counters, watermark_hold,
- undeclared_tag_values=None):
- self.transform = applied_ptransform
- self.uncommitted_output_bundles = uncommitted_output_bundles
- # TODO: timer update is currently unused.
- self.timer_update = timer_update
- self.counters = counters
- self.watermark_hold = watermark_hold
- # Only used when caching (materializing) all values is requested.
- self.undeclared_tag_values = undeclared_tag_values
- # Populated by the TransformExecutor.
- self.logical_metric_updates = None
http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py
new file mode 100644
index 0000000..daaaceb
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/util.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility classes used by the DirectRunner.
+
+For internal use only. No backwards compatibility guarantees.
+"""
+
+from __future__ import absolute_import
+
+
+class TransformResult(object):
+ """Result of evaluating an AppliedPTransform with a TransformEvaluator."""
+
+ def __init__(self, applied_ptransform, uncommitted_output_bundles,
+ counters, watermark_hold, undeclared_tag_values=None):
+ self.transform = applied_ptransform
+ self.uncommitted_output_bundles = uncommitted_output_bundles
+ self.counters = counters
+ self.watermark_hold = watermark_hold
+ # Only used when caching (materializing) all values is requested.
+ self.undeclared_tag_values = undeclared_tag_values
+ # Populated by the TransformExecutor.
+ self.logical_metric_updates = None
+
+
+class TimerFiring(object):
+ """A single instance of a fired timer."""
+
+ def __init__(self, key, window, name, time_domain, timestamp):
+ self.key = key
+ self.window = window
+ self.name = name
+ self.time_domain = time_domain
+ self.timestamp = timestamp
+
+
+class KeyedWorkItem(object):
+ """A keyed item that can either be a timer firing or a list of elements."""
+ def __init__(self, key, timer_firing=None, elements=None):
+ self.key = key
+ assert not timer_firing and elements
+ self.timer_firing = timer_firing
+ self.elements = elements
http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 0d7cd4f..10d25d7 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -23,6 +23,7 @@ import threading
from apache_beam import pipeline
from apache_beam import pvalue
+from apache_beam.runners.direct.util import TimerFiring
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import TIME_GRANULARITY
@@ -36,21 +37,23 @@ class WatermarkManager(object):
WATERMARK_POS_INF = MAX_TIMESTAMP
WATERMARK_NEG_INF = MIN_TIMESTAMP
- def __init__(self, clock, root_transforms, value_to_consumers):
+ def __init__(self, clock, root_transforms, value_to_consumers,
+ transform_keyed_states):
self._clock = clock # processing time clock
- self._value_to_consumers = value_to_consumers
self._root_transforms = root_transforms
+ self._value_to_consumers = value_to_consumers
+ self._transform_keyed_states = transform_keyed_states
# AppliedPTransform -> TransformWatermarks
self._transform_to_watermarks = {}
for root_transform in root_transforms:
self._transform_to_watermarks[root_transform] = _TransformWatermarks(
- self._clock)
+ self._clock, transform_keyed_states[root_transform], root_transform)
for consumers in value_to_consumers.values():
for consumer in consumers:
self._transform_to_watermarks[consumer] = _TransformWatermarks(
- self._clock)
+ self._clock, transform_keyed_states[consumer], consumer)
for consumers in value_to_consumers.values():
for consumer in consumers:
@@ -90,16 +93,17 @@ class WatermarkManager(object):
return self._transform_to_watermarks[applied_ptransform]
def update_watermarks(self, completed_committed_bundle, applied_ptransform,
- timer_update, outputs, earliest_hold):
+ completed_timers, outputs, earliest_hold):
assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
self._update_pending(
- completed_committed_bundle, applied_ptransform, timer_update, outputs)
+ completed_committed_bundle, applied_ptransform, completed_timers,
+ outputs)
tw = self.get_watermarks(applied_ptransform)
tw.hold(earliest_hold)
self._refresh_watermarks(applied_ptransform)
def _update_pending(self, input_committed_bundle, applied_ptransform,
- timer_update, output_committed_bundles):
+ completed_timers, output_committed_bundles):
"""Updated list of pending bundles for the given AppliedPTransform."""
# Update pending elements. Filter out empty bundles. They do not impact
@@ -113,7 +117,7 @@ class WatermarkManager(object):
consumer_tw.add_pending(output)
completed_tw = self._transform_to_watermarks[applied_ptransform]
- completed_tw.update_timers(timer_update)
+ completed_tw.update_timers(completed_timers)
assert input_committed_bundle or applied_ptransform in self._root_transforms
if input_committed_bundle and input_committed_bundle.has_elements():
@@ -137,33 +141,37 @@ class WatermarkManager(object):
def extract_fired_timers(self):
all_timers = []
for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
- if tw.extract_fired_timers():
- all_timers.append(applied_ptransform)
+ fired_timers = tw.extract_fired_timers()
+ if fired_timers:
+ all_timers.append((applied_ptransform, fired_timers))
return all_timers
class _TransformWatermarks(object):
- """Tracks input and output watermarks for aan AppliedPTransform."""
+ """Tracks input and output watermarks for an AppliedPTransform."""
- def __init__(self, clock):
+ def __init__(self, clock, keyed_states, transform):
self._clock = clock
+ self._keyed_states = keyed_states
self._input_transform_watermarks = []
self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
self._earliest_hold = WatermarkManager.WATERMARK_POS_INF
self._pending = set() # Scheduled bundles targeted for this transform.
- self._fired_timers = False
+ self._fired_timers = set()
self._lock = threading.Lock()
+ self._label = str(transform)
+
def update_input_transform_watermarks(self, input_transform_watermarks):
with self._lock:
self._input_transform_watermarks = input_transform_watermarks
- def update_timers(self, timer_update):
+ def update_timers(self, completed_timers):
with self._lock:
- if timer_update:
- assert self._fired_timers
- self._fired_timers = False
+ for timer_firing in completed_timers:
+ print 'REMOVE', timer_firing
+ self._fired_timers.remove(timer_firing)
@property
def input_watermark(self):
@@ -233,8 +241,12 @@ class _TransformWatermarks(object):
if self._fired_timers:
return False
- should_fire = (
- self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and
- self._input_watermark == WatermarkManager.WATERMARK_POS_INF)
- self._fired_timers = should_fire
- return should_fire
+ fired_timers = []
+ for key, state in self._keyed_states.iteritems():
+ timers = state.get_timers(watermark=self._input_watermark)
+ for expired in timers:
+ window, (name, time_domain, timestamp) = expired
+ fired_timers.append(
+ TimerFiring(key, window, name, time_domain, timestamp))
+ self._fired_timers.update(fired_timers)
+ return fired_timers
http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 89c6ec5..7ff44fa 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -1102,17 +1102,21 @@ class InMemoryUnmergedState(UnmergedState):
if not self.state[window]:
self.state.pop(window, None)
- def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
+ def get_timers(self, clear=False, watermark=MAX_TIMESTAMP):
expired = []
for window, timers in list(self.timers.items()):
for (name, time_domain), timestamp in list(timers.items()):
if timestamp <= watermark:
expired.append((window, (name, time_domain, timestamp)))
- del timers[(name, time_domain)]
- if not timers:
+ if clear:
+ del timers[(name, time_domain)]
+ if not timers and clear:
del self.timers[window]
return expired
+ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
+ return self.get_timers(clear=True, watermark=watermark)
+
def __repr__(self):
state_str = '\n'.join('%s: %s' % (key, dict(state))
for key, state in self.state.items())