You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:04:31 UTC

[05/50] beam git commit: Use state / timer API for DirectRunner timer firings

Use state / timer API for DirectRunner timer firings


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56041b78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56041b78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56041b78

Branch: refs/heads/gearpump-runner
Commit: 56041b7850abfbb10d4a6ff2ddecb227a0a4e7c8
Parents: 50acc6c
Author: Charles Chen <cc...@google.com>
Authored: Tue Jun 20 15:22:58 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 21 09:23:13 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/evaluation_context.py        |  3 +-
 .../apache_beam/runners/direct/executor.py      | 37 ++++++++-----
 .../runners/direct/transform_evaluator.py       | 48 +++++++++++++---
 .../runners/direct/transform_result.py          | 40 --------------
 sdks/python/apache_beam/runners/direct/util.py  | 58 ++++++++++++++++++++
 .../runners/direct/watermark_manager.py         | 56 +++++++++++--------
 sdks/python/apache_beam/transforms/trigger.py   | 10 +++-
 7 files changed, 163 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 8fa8e06..976e9e8 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -148,7 +148,8 @@ class EvaluationContext(object):
     self._transform_keyed_states = self._initialize_keyed_states(
         root_transforms, value_to_consumers)
     self._watermark_manager = WatermarkManager(
-        Clock(), root_transforms, value_to_consumers)
+        Clock(), root_transforms, value_to_consumers,
+        self._transform_keyed_states)
     self._side_inputs_container = _SideInputsContainer(views)
     self._pending_unblocked_tasks = []
     self._counter_factory = counters.CounterFactory()

http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index eff2d3c..a0a3886 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -222,14 +222,14 @@ class _CompletionCallback(object):
   or for a source transform.
   """
 
-  def __init__(self, evaluation_context, all_updates, timers=None):
+  def __init__(self, evaluation_context, all_updates, timer_firings=None):
     self._evaluation_context = evaluation_context
     self._all_updates = all_updates
-    self._timers = timers
+    self._timer_firings = timer_firings or []
 
   def handle_result(self, input_committed_bundle, transform_result):
     output_committed_bundles = self._evaluation_context.handle_result(
-        input_committed_bundle, self._timers, transform_result)
+        input_committed_bundle, self._timer_firings, transform_result)
     for output_committed_bundle in output_committed_bundles:
       self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate(
           output_committed_bundle, None))
@@ -251,11 +251,12 @@ class TransformExecutor(_ExecutorService.CallableTask):
   """
 
   def __init__(self, transform_evaluator_registry, evaluation_context,
-               input_bundle, applied_ptransform, completion_callback,
-               transform_evaluation_state):
+               input_bundle, fired_timers, applied_ptransform,
+               completion_callback, transform_evaluation_state):
     self._transform_evaluator_registry = transform_evaluator_registry
     self._evaluation_context = evaluation_context
     self._input_bundle = input_bundle
+    self._fired_timers = fired_timers
     self._applied_ptransform = applied_ptransform
     self._completion_callback = completion_callback
     self._transform_evaluation_state = transform_evaluation_state
@@ -288,6 +289,10 @@ class TransformExecutor(_ExecutorService.CallableTask):
           self._applied_ptransform, self._input_bundle,
           side_input_values, scoped_metrics_container)
 
+      if self._fired_timers:
+        for timer_firing in self._fired_timers:
+          evaluator.process_timer_wrapper(timer_firing)
+
       if self._input_bundle:
         for value in self._input_bundle.get_elements_iterable():
           evaluator.process_element(value)
@@ -379,11 +384,11 @@ class _ExecutorServiceParallelExecutor(object):
     if committed_bundle.pcollection in self.value_to_consumers:
       consumers = self.value_to_consumers[committed_bundle.pcollection]
       for applied_ptransform in consumers:
-        self.schedule_consumption(applied_ptransform, committed_bundle,
+        self.schedule_consumption(applied_ptransform, committed_bundle, [],
                                   self.default_completion_callback)
 
   def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
-                           on_complete):
+                           fired_timers, on_complete):
     """Schedules evaluation of the given bundle with the transform."""
     assert consumer_applied_ptransform
     assert committed_bundle
@@ -397,8 +402,8 @@ class _ExecutorServiceParallelExecutor(object):
 
     transform_executor = TransformExecutor(
         self.transform_evaluator_registry, self.evaluation_context,
-        committed_bundle, consumer_applied_ptransform, on_complete,
-        transform_executor_service)
+        committed_bundle, fired_timers, consumer_applied_ptransform,
+        on_complete, transform_executor_service)
     transform_executor_service.schedule(transform_executor)
 
   class _TypedUpdateQueue(object):
@@ -527,19 +532,21 @@ class _ExecutorServiceParallelExecutor(object):
       Returns:
         True if timers fired.
       """
-      fired_timers = self._executor.evaluation_context.extract_fired_timers()
-      for applied_ptransform in fired_timers:
+      transform_fired_timers = (
+          self._executor.evaluation_context.extract_fired_timers())
+      for applied_ptransform, fired_timers in transform_fired_timers:
         # Use an empty committed bundle. just to trigger.
         empty_bundle = (
             self._executor.evaluation_context.create_empty_committed_bundle(
                 applied_ptransform.inputs[0]))
         timer_completion_callback = _CompletionCallback(
             self._executor.evaluation_context, self._executor.all_updates,
-            applied_ptransform)
+            timer_firings=fired_timers)
 
         self._executor.schedule_consumption(
-            applied_ptransform, empty_bundle, timer_completion_callback)
-      return bool(fired_timers)
+            applied_ptransform, empty_bundle, fired_timers,
+            timer_completion_callback)
+      return bool(transform_fired_timers)
 
     def _is_executing(self):
       """Returns True if there is at least one non-blocked TransformExecutor."""
@@ -582,6 +589,6 @@ class _ExecutorServiceParallelExecutor(object):
               applied_ptransform, [])
           for bundle in pending_bundles:
             self._executor.schedule_consumption(
-                applied_ptransform, bundle,
+                applied_ptransform, bundle, [],
                 self._executor.default_completion_callback)
           self._executor.node_to_pending_bundles[applied_ptransform] = []

http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6e73561..e92d799 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -28,13 +28,15 @@ import apache_beam.io as io
 from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
 from apache_beam.runners.direct.watermark_manager import WatermarkManager
-from apache_beam.runners.direct.transform_result import TransformResult
+from apache_beam.runners.direct.util import KeyedWorkItem
+from apache_beam.runners.direct.util import TransformResult
 from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # pylint: disable=protected-access
 from apache_beam.transforms import core
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.trigger import _CombiningValueStateTag
 from apache_beam.transforms.trigger import _ListStateTag
+from apache_beam.transforms.trigger import TimeDomain
 from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
 from apache_beam.typehints.typecheck import TypeCheckError
 from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
@@ -199,6 +201,25 @@ class _TransformEvaluator(object):
     """Starts a new bundle."""
     pass
 
+  def process_timer_wrapper(self, timer_firing):
+    """Process timer by clearing and then calling process_timer().
+
+    This method is called with any timer firing and clears the delivered
+    timer from the keyed state and then calls process_timer().  The default
+    process_timer() implementation emits a KeyedWorkItem for the particular
+    timer and passes it to process_element().  Evaluator subclasses which
+    desire different timer delivery semantics can override process_timer().
+    """
+    state = self.step_context.get_keyed_state(timer_firing.key)
+    state.clear_timer(
+        timer_firing.window, timer_firing.name, timer_firing.time_domain)
+    self.process_timer(timer_firing)
+
+  def process_timer(self, timer_firing):
+    """Default process_timer() impl. generating KeyedWorkItem element."""
+    self.process_element(
+        KeyedWorkItem(timer_firing.key, timer_firing=timer_firing))
+
   def process_element(self, element):
     """Processes a new element as part of the current bundle."""
     raise NotImplementedError('%s do not process elements.', type(self))
@@ -244,7 +265,7 @@ class _BoundedReadEvaluator(_TransformEvaluator):
         bundles = _read_values_to_bundles(reader)
 
     return TransformResult(
-        self._applied_ptransform, bundles, None, None, None)
+        self._applied_ptransform, bundles, None, None)
 
 
 class _FlattenEvaluator(_TransformEvaluator):
@@ -268,7 +289,7 @@ class _FlattenEvaluator(_TransformEvaluator):
   def finish_bundle(self):
     bundles = [self.bundle]
     return TransformResult(
-        self._applied_ptransform, bundles, None, None, None)
+        self._applied_ptransform, bundles, None, None)
 
 
 class _TaggedReceivers(dict):
@@ -357,7 +378,7 @@ class _ParDoEvaluator(_TransformEvaluator):
     bundles = self._tagged_receivers.values()
     result_counters = self._counter_factory.get_counters()
     return TransformResult(
-        self._applied_ptransform, bundles, None, result_counters, None,
+        self._applied_ptransform, bundles, result_counters, None,
         self._tagged_receivers.undeclared_in_memory_tag_values)
 
 
@@ -375,7 +396,6 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
         evaluation_context, applied_ptransform, input_committed_bundle,
         side_inputs, scoped_metrics_container)
 
-  @property
   def _is_final_bundle(self):
     return (self._execution_context.watermarks.input_watermark
             == WatermarkManager.WATERMARK_POS_INF)
@@ -392,6 +412,10 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
         self._applied_ptransform.transform.get_type_hints().input_types[0])
     self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
 
+  def process_timer(self, timer_firing):
+    # We do not need to emit a KeyedWorkItem to process_element().
+    pass
+
   def process_element(self, element):
     assert not self.global_state.get_state(
         None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG)
@@ -408,7 +432,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
                            % element)
 
   def finish_bundle(self):
-    if self._is_final_bundle:
+    if self._is_final_bundle():
       if self.global_state.get_state(
           None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG):
         # Ignore empty bundles after emitting output. (This may happen because
@@ -441,9 +465,11 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
     else:
       bundles = []
       hold = WatermarkManager.WATERMARK_NEG_INF
+      self.global_state.set_timer(
+          None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, bundles, None, None, hold)
+        self._applied_ptransform, bundles, None, hold)
 
 
 class _NativeWriteEvaluator(_TransformEvaluator):
@@ -475,6 +501,10 @@ class _NativeWriteEvaluator(_TransformEvaluator):
     self.step_context = self._execution_context.get_step_context()
     self.global_state = self.step_context.get_keyed_state(None)
 
+  def process_timer(self, timer_firing):
+    # We do not need to emit a KeyedWorkItem to process_element().
+    pass
+
   def process_element(self, element):
     self.global_state.add_state(
         None, _NativeWriteEvaluator.ELEMENTS_TAG, element)
@@ -500,6 +530,8 @@ class _NativeWriteEvaluator(_TransformEvaluator):
       hold = WatermarkManager.WATERMARK_POS_INF
     else:
       hold = WatermarkManager.WATERMARK_NEG_INF
+      self.global_state.set_timer(
+          None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, [], None, None, hold)
+        self._applied_ptransform, [], None, hold)

http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py
deleted file mode 100644
index 51593e3..0000000
--- a/sdks/python/apache_beam/runners/direct/transform_result.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
-from __future__ import absolute_import
-
-
-class TransformResult(object):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
-  def __init__(self, applied_ptransform, uncommitted_output_bundles,
-               timer_update, counters, watermark_hold,
-               undeclared_tag_values=None):
-    self.transform = applied_ptransform
-    self.uncommitted_output_bundles = uncommitted_output_bundles
-    # TODO: timer update is currently unused.
-    self.timer_update = timer_update
-    self.counters = counters
-    self.watermark_hold = watermark_hold
-    # Only used when caching (materializing) all values is requested.
-    self.undeclared_tag_values = undeclared_tag_values
-    # Populated by the TransformExecutor.
-    self.logical_metric_updates = None

http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py
new file mode 100644
index 0000000..daaaceb
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/util.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility classes used by the DirectRunner.
+
+For internal use only. No backwards compatibility guarantees.
+"""
+
+from __future__ import absolute_import
+
+
+class TransformResult(object):
+  """Result of evaluating an AppliedPTransform with a TransformEvaluator."""
+
+  def __init__(self, applied_ptransform, uncommitted_output_bundles,
+               counters, watermark_hold, undeclared_tag_values=None):
+    self.transform = applied_ptransform
+    self.uncommitted_output_bundles = uncommitted_output_bundles
+    self.counters = counters
+    self.watermark_hold = watermark_hold
+    # Only used when caching (materializing) all values is requested.
+    self.undeclared_tag_values = undeclared_tag_values
+    # Populated by the TransformExecutor.
+    self.logical_metric_updates = None
+
+
+class TimerFiring(object):
+  """A single instance of a fired timer."""
+
+  def __init__(self, key, window, name, time_domain, timestamp):
+    self.key = key
+    self.window = window
+    self.name = name
+    self.time_domain = time_domain
+    self.timestamp = timestamp
+
+
+class KeyedWorkItem(object):
+  """A keyed item that can either be a timer firing or a list of elements."""
+  def __init__(self, key, timer_firing=None, elements=None):
+    self.key = key
+    assert not timer_firing and elements
+    self.timer_firing = timer_firing
+    self.elements = elements

http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 0d7cd4f..10d25d7 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -23,6 +23,7 @@ import threading
 
 from apache_beam import pipeline
 from apache_beam import pvalue
+from apache_beam.runners.direct.util import TimerFiring
 from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.timestamp import TIME_GRANULARITY
@@ -36,21 +37,23 @@ class WatermarkManager(object):
   WATERMARK_POS_INF = MAX_TIMESTAMP
   WATERMARK_NEG_INF = MIN_TIMESTAMP
 
-  def __init__(self, clock, root_transforms, value_to_consumers):
+  def __init__(self, clock, root_transforms, value_to_consumers,
+               transform_keyed_states):
     self._clock = clock  # processing time clock
-    self._value_to_consumers = value_to_consumers
     self._root_transforms = root_transforms
+    self._value_to_consumers = value_to_consumers
+    self._transform_keyed_states = transform_keyed_states
     # AppliedPTransform -> TransformWatermarks
     self._transform_to_watermarks = {}
 
     for root_transform in root_transforms:
       self._transform_to_watermarks[root_transform] = _TransformWatermarks(
-          self._clock)
+          self._clock, transform_keyed_states[root_transform], root_transform)
 
     for consumers in value_to_consumers.values():
       for consumer in consumers:
         self._transform_to_watermarks[consumer] = _TransformWatermarks(
-            self._clock)
+            self._clock, transform_keyed_states[consumer], consumer)
 
     for consumers in value_to_consumers.values():
       for consumer in consumers:
@@ -90,16 +93,17 @@ class WatermarkManager(object):
     return self._transform_to_watermarks[applied_ptransform]
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
-                        timer_update, outputs, earliest_hold):
+                        completed_timers, outputs, earliest_hold):
     assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
     self._update_pending(
-        completed_committed_bundle, applied_ptransform, timer_update, outputs)
+        completed_committed_bundle, applied_ptransform, completed_timers,
+        outputs)
     tw = self.get_watermarks(applied_ptransform)
     tw.hold(earliest_hold)
     self._refresh_watermarks(applied_ptransform)
 
   def _update_pending(self, input_committed_bundle, applied_ptransform,
-                      timer_update, output_committed_bundles):
+                      completed_timers, output_committed_bundles):
     """Updated list of pending bundles for the given AppliedPTransform."""
 
     # Update pending elements. Filter out empty bundles. They do not impact
@@ -113,7 +117,7 @@ class WatermarkManager(object):
             consumer_tw.add_pending(output)
 
     completed_tw = self._transform_to_watermarks[applied_ptransform]
-    completed_tw.update_timers(timer_update)
+    completed_tw.update_timers(completed_timers)
 
     assert input_committed_bundle or applied_ptransform in self._root_transforms
     if input_committed_bundle and input_committed_bundle.has_elements():
@@ -137,33 +141,37 @@ class WatermarkManager(object):
   def extract_fired_timers(self):
     all_timers = []
     for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
-      if tw.extract_fired_timers():
-        all_timers.append(applied_ptransform)
+      fired_timers = tw.extract_fired_timers()
+      if fired_timers:
+        all_timers.append((applied_ptransform, fired_timers))
     return all_timers
 
 
 class _TransformWatermarks(object):
-  """Tracks input and output watermarks for aan AppliedPTransform."""
+  """Tracks input and output watermarks for an AppliedPTransform."""
 
-  def __init__(self, clock):
+  def __init__(self, clock, keyed_states, transform):
     self._clock = clock
+    self._keyed_states = keyed_states
     self._input_transform_watermarks = []
     self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
     self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
     self._earliest_hold = WatermarkManager.WATERMARK_POS_INF
     self._pending = set()  # Scheduled bundles targeted for this transform.
-    self._fired_timers = False
+    self._fired_timers = set()
     self._lock = threading.Lock()
 
+    self._label = str(transform)
+
   def update_input_transform_watermarks(self, input_transform_watermarks):
     with self._lock:
       self._input_transform_watermarks = input_transform_watermarks
 
-  def update_timers(self, timer_update):
+  def update_timers(self, completed_timers):
     with self._lock:
-      if timer_update:
-        assert self._fired_timers
-        self._fired_timers = False
+      for timer_firing in completed_timers:
+        print 'REMOVE', timer_firing
+        self._fired_timers.remove(timer_firing)
 
   @property
   def input_watermark(self):
@@ -233,8 +241,12 @@ class _TransformWatermarks(object):
       if self._fired_timers:
         return False
 
-      should_fire = (
-          self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and
-          self._input_watermark == WatermarkManager.WATERMARK_POS_INF)
-      self._fired_timers = should_fire
-      return should_fire
+      fired_timers = []
+      for key, state in self._keyed_states.iteritems():
+        timers = state.get_timers(watermark=self._input_watermark)
+        for expired in timers:
+          window, (name, time_domain, timestamp) = expired
+          fired_timers.append(
+              TimerFiring(key, window, name, time_domain, timestamp))
+      self._fired_timers.update(fired_timers)
+      return fired_timers

http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 89c6ec5..7ff44fa 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -1102,17 +1102,21 @@ class InMemoryUnmergedState(UnmergedState):
     if not self.state[window]:
       self.state.pop(window, None)
 
-  def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
+  def get_timers(self, clear=False, watermark=MAX_TIMESTAMP):
     expired = []
     for window, timers in list(self.timers.items()):
       for (name, time_domain), timestamp in list(timers.items()):
         if timestamp <= watermark:
           expired.append((window, (name, time_domain, timestamp)))
-          del timers[(name, time_domain)]
-      if not timers:
+          if clear:
+            del timers[(name, time_domain)]
+      if not timers and clear:
         del self.timers[window]
     return expired
 
+  def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
+    return self.get_timers(clear=True, watermark=watermark)
+
   def __repr__(self):
     state_str = '\n'.join('%s: %s' % (key, dict(state))
                           for key, state in self.state.items())