You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/09 16:37:40 UTC

[3/4] beam git commit: [BEAM-2184] Rename OutputTimeFn to TimestampCombiner.

[BEAM-2184] Rename OutputTimeFn to TimestampCombiner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2aa70944
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2aa70944
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2aa70944

Branch: refs/heads/release-2.0.0
Commit: 2aa70944ed88616e02e46c35549b74a784dfa4cc
Parents: f3f47f6
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon May 8 15:54:23 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 9 09:37:08 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py      | 23 ++++++++-------
 sdks/python/apache_beam/transforms/timeutil.py  | 22 +++++++-------
 sdks/python/apache_beam/transforms/trigger.py   | 26 ++++++++---------
 .../apache_beam/transforms/trigger_test.py      |  9 +++---
 .../transforms/trigger_transcripts.yaml         | 30 ++++++++++----------
 sdks/python/apache_beam/transforms/window.py    | 22 +++++++-------
 .../apache_beam/transforms/window_test.py       |  4 +--
 7 files changed, 69 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9367e6f..7ca1632 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -33,7 +33,7 @@ from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import PTransformWithSideInputs
 from apache_beam.transforms.window import MIN_TIMESTAMP
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import GlobalWindows
@@ -1172,7 +1172,7 @@ class Partition(PTransformWithSideInputs):
 class Windowing(object):
 
   def __init__(self, windowfn, triggerfn=None, accumulation_mode=None,
-               output_time_fn=None):
+               timestamp_combiner=None):
     global AccumulationMode, DefaultTrigger  # pylint: disable=global-variable-not-assigned
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger
@@ -1192,17 +1192,18 @@ class Windowing(object):
     self.windowfn = windowfn
     self.triggerfn = triggerfn
     self.accumulation_mode = accumulation_mode
-    self.output_time_fn = output_time_fn or OutputTimeFn.OUTPUT_AT_EOW
+    self.timestamp_combiner = (
+        timestamp_combiner or TimestampCombiner.OUTPUT_AT_EOW)
     self._is_default = (
         self.windowfn == GlobalWindows() and
         self.triggerfn == DefaultTrigger() and
         self.accumulation_mode == AccumulationMode.DISCARDING and
-        self.output_time_fn == OutputTimeFn.OUTPUT_AT_EOW)
+        self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW)
 
   def __repr__(self):
     return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn,
                                           self.accumulation_mode,
-                                          self.output_time_fn)
+                                          self.timestamp_combiner)
 
   def __eq__(self, other):
     if type(self) == type(other):
@@ -1212,7 +1213,7 @@ class Windowing(object):
           self.windowfn == other.windowfn
           and self.triggerfn == other.triggerfn
           and self.accumulation_mode == other.accumulation_mode
-          and self.output_time_fn == other.output_time_fn)
+          and self.timestamp_combiner == other.timestamp_combiner)
     return False
 
   def is_default(self):
@@ -1229,7 +1230,7 @@ class Windowing(object):
             self.windowfn.get_window_coder()),
         trigger=self.triggerfn.to_runner_api(context),
         accumulation_mode=self.accumulation_mode,
-        output_time=self.output_time_fn,
+        output_time=self.timestamp_combiner,
         # TODO(robertwb): Support EMIT_IF_NONEMPTY
         closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
         allowed_lateness=0)
@@ -1242,7 +1243,7 @@ class Windowing(object):
         windowfn=WindowFn.from_runner_api(proto.window_fn, context),
         triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
         accumulation_mode=proto.accumulation_mode,
-        output_time_fn=proto.output_time)
+        timestamp_combiner=proto.output_time)
 
 
 @typehints.with_input_types(T)
@@ -1275,9 +1276,9 @@ class WindowInto(ParDo):
     """
     triggerfn = kwargs.pop('trigger', None)
     accumulation_mode = kwargs.pop('accumulation_mode', None)
-    output_time_fn = kwargs.pop('output_time_fn', None)
+    timestamp_combiner = kwargs.pop('timestamp_combiner', None)
     self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
-                               output_time_fn)
+                               timestamp_combiner)
     super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
 
   def get_windowing(self, unused_inputs):
@@ -1307,7 +1308,7 @@ class WindowInto(ParDo):
         windowing.windowfn,
         trigger=windowing.triggerfn,
         accumulation_mode=windowing.accumulation_mode,
-        output_time_fn=windowing.output_time_fn)
+        timestamp_combiner=windowing.timestamp_combiner)
 
 
 PTransform.register_urn(

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 5453b20..435d41b 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -49,8 +49,8 @@ class TimeDomain(object):
     raise ValueError('Unknown time domain: %s' % domain)
 
 
-class OutputTimeFnImpl(object):
-  """Implementation of OutputTimeFn."""
+class TimestampCombinerImpl(object):
+  """Implementation of TimestampCombiner."""
 
   __metaclass__ = ABCMeta
 
@@ -78,8 +78,8 @@ class OutputTimeFnImpl(object):
     return self.combine_all(merging_timestamps)
 
 
-class DependsOnlyOnWindow(OutputTimeFnImpl):
-  """OutputTimeFnImpl that only depends on the window."""
+class DependsOnlyOnWindow(TimestampCombinerImpl):
+  """TimestampCombinerImpl that only depends on the window."""
 
   __metaclass__ = ABCMeta
 
@@ -92,8 +92,8 @@ class DependsOnlyOnWindow(OutputTimeFnImpl):
     return self.assign_output_time(result_window, None)
 
 
-class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl):
-  """OutputTimeFnImpl outputting at earliest input timestamp."""
+class OutputAtEarliestInputTimestampImpl(TimestampCombinerImpl):
+  """TimestampCombinerImpl outputting at earliest input timestamp."""
 
   def assign_output_time(self, window, input_timestamp):
     return input_timestamp
@@ -103,8 +103,8 @@ class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl):
     return min(output_timestamp, other_output_timestamp)
 
 
-class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl):
-  """OutputTimeFnImpl outputting at earliest input timestamp."""
+class OutputAtEarliestTransformedInputTimestampImpl(TimestampCombinerImpl):
+  """TimestampCombinerImpl outputting at earliest input timestamp."""
 
   def __init__(self, window_fn):
     self.window_fn = window_fn
@@ -116,8 +116,8 @@ class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl):
     return min(output_timestamp, other_output_timestamp)
 
 
-class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl):
-  """OutputTimeFnImpl outputting at latest input timestamp."""
+class OutputAtLatestInputTimestampImpl(TimestampCombinerImpl):
+  """TimestampCombinerImpl outputting at latest input timestamp."""
 
   def assign_output_time(self, window, input_timestamp):
     return input_timestamp
@@ -127,7 +127,7 @@ class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl):
 
 
 class OutputAtEndOfWindowImpl(DependsOnlyOnWindow):
-  """OutputTimeFnImpl outputting at end of window."""
+  """TimestampCombinerImpl outputting at end of window."""
 
   def assign_output_time(self, window, unused_input_timestamp):
     return window.end

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index b9786f4..bcb9dd3 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -32,7 +32,7 @@ from apache_beam.transforms.timeutil import MAX_TIMESTAMP
 from apache_beam.transforms.timeutil import MIN_TIMESTAMP
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.window import GlobalWindow
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
 from apache_beam.runners.api import beam_runner_api_pb2
@@ -100,17 +100,17 @@ class ListStateTag(StateTag):
 
 class WatermarkHoldStateTag(StateTag):
 
-  def __init__(self, tag, output_time_fn_impl):
+  def __init__(self, tag, timestamp_combiner_impl):
     super(WatermarkHoldStateTag, self).__init__(tag)
-    self.output_time_fn_impl = output_time_fn_impl
+    self.timestamp_combiner_impl = timestamp_combiner_impl
 
   def __repr__(self):
     return 'WatermarkHoldStateTag(%s, %s)' % (self.tag,
-                                              self.output_time_fn_impl)
+                                              self.timestamp_combiner_impl)
 
   def with_prefix(self, prefix):
     return WatermarkHoldStateTag(prefix + self.tag,
-                                 self.output_time_fn_impl)
+                                 self.timestamp_combiner_impl)
 
 
 # pylint: disable=unused-argument
@@ -750,7 +750,7 @@ class MergeableStateAdapter(SimpleState):
     elif isinstance(tag, ListStateTag):
       return [v for vs in values for v in vs]
     elif isinstance(tag, WatermarkHoldStateTag):
-      return tag.output_time_fn_impl.combine_all(values)
+      return tag.timestamp_combiner_impl.combine_all(values)
     else:
       raise ValueError('Invalid tag.', tag)
 
@@ -909,11 +909,11 @@ class GeneralTriggerDriver(TriggerDriver):
 
   def __init__(self, windowing):
     self.window_fn = windowing.windowfn
-    self.output_time_fn_impl = OutputTimeFn.get_impl(windowing.output_time_fn,
-                                                     self.window_fn)
+    self.timestamp_combiner_impl = TimestampCombiner.get_impl(
+        windowing.timestamp_combiner, self.window_fn)
     # pylint: disable=invalid-name
-    self.WATERMARK_HOLD = WatermarkHoldStateTag('watermark',
-                                                self.output_time_fn_impl)
+    self.WATERMARK_HOLD = WatermarkHoldStateTag(
+        'watermark', self.timestamp_combiner_impl)
     # pylint: enable=invalid-name
     self.trigger_fn = windowing.triggerfn
     self.accumulation_mode = windowing.accumulation_mode
@@ -965,10 +965,10 @@ class GeneralTriggerDriver(TriggerDriver):
         continue
       # Add watermark hold.
       # TODO(ccy): Add late data and garbage-collection hold support.
-      output_time = self.output_time_fn_impl.merge(
+      output_time = self.timestamp_combiner_impl.merge(
           window,
           (element_output_time for element_output_time in
-           (self.output_time_fn_impl.assign_output_time(window, timestamp)
+           (self.timestamp_combiner_impl.assign_output_time(window, timestamp)
             for unused_value, timestamp in elements)
            if element_output_time >= output_watermark))
       if output_time is not None:
@@ -1075,7 +1075,7 @@ class InMemoryUnmergedState(UnmergedState):
     elif isinstance(tag, ListStateTag):
       return values
     elif isinstance(tag, WatermarkHoldStateTag):
-      return tag.output_time_fn_impl.combine_all(values)
+      return tag.timestamp_combiner_impl.combine_all(values)
     else:
       raise ValueError('Invalid tag.', tag)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 914babb..38871fe 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -44,7 +44,7 @@ from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import MIN_TIMESTAMP
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
 from apache_beam.transforms.window import Sessions
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowedValue
@@ -522,11 +522,12 @@ class TranscriptTest(unittest.TestCase):
     trigger_fn = parse_fn(spec.get('trigger_fn', 'Default'), trigger_names)
     accumulation_mode = getattr(
         AccumulationMode, spec.get('accumulation_mode', 'ACCUMULATING').upper())
-    output_time_fn = getattr(
-        OutputTimeFn, spec.get('output_time_fn', 'OUTPUT_AT_EOW').upper())
+    timestamp_combiner = getattr(
+        TimestampCombiner,
+        spec.get('timestamp_combiner', 'OUTPUT_AT_EOW').upper())
 
     driver = GeneralTriggerDriver(
-        Windowing(window_fn, trigger_fn, accumulation_mode, output_time_fn))
+        Windowing(window_fn, trigger_fn, accumulation_mode, timestamp_combiner))
     state = InMemoryUnmergedState()
     output = []
     watermark = MIN_TIMESTAMP

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_transcripts.yaml b/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
index f87cd1d..a736e94 100644
--- a/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
+++ b/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
@@ -29,7 +29,7 @@ transcript:                       # Ordered list of events.
 name: fixed_default_late_data
 window_fn: FixedWindows(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
 transcript:
   - input: [1, 2, 3, 10, 11, 25]
   - watermark: 100
@@ -42,10 +42,10 @@ transcript:
       - {window: [0, 9], values: [1, 2, 3, 7], timestamp: 10, late: true}
 
 ---
-name: output_time_fn_earliest
+name: timestamp_combiner_earliest
 window_fn: FixedWindows(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
 transcript:
   - input: [1, 2, 3, 10, 11, 25]
   - watermark: 100
@@ -55,10 +55,10 @@ transcript:
       - {window: [20, 29], values: [25], timestamp: 25, late: false}
 
 ---
-name: output_time_fn_latest
+name: timestamp_combiner_latest
 window_fn: FixedWindows(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_LATEST
+timestamp_combiner: OUTPUT_AT_LATEST
 transcript:
   - input: [1, 2, 3, 10, 11, 25]
   - watermark: 100
@@ -69,10 +69,10 @@ transcript:
 
 ---
 # Test that custom timestamping is not invoked.
-name: output_time_fn_custom_timestamping_eow
+name: timestamp_combiner_custom_timestamping_eow
 window_fn: CustomTimestampingFixedWindowsWindowFn(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
 transcript:
   - input: [1, 2, 3, 10, 11, 25]
   - watermark: 100
@@ -83,10 +83,10 @@ transcript:
 
 ---
 # Test that custom timestamping is not invoked.
-name: output_time_fn_custom_timestamping_earliest
+name: timestamp_combiner_custom_timestamping_earliest
 window_fn: CustomTimestampingFixedWindowsWindowFn(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
 transcript:
   - input: [1, 2, 3, 10, 11, 25]
   - watermark: 100
@@ -97,10 +97,10 @@ transcript:
 
 ---
 # Test that custom timestamping is in fact invoked.
-name: output_time_fn_custom_timestamping_earliest
+name: timestamp_combiner_custom_timestamping_earliest
 window_fn: CustomTimestampingFixedWindowsWindowFn(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST_TRANSFORMED
+timestamp_combiner: OUTPUT_AT_EARLIEST_TRANSFORMED
 transcript:
   - input: [1, 2, 3, 10, 11, 25]
   - watermark: 100
@@ -113,7 +113,7 @@ transcript:
 name: early_late_sessions
 window_fn: Sessions(10)
 trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3))
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
 transcript:
     - input: [1, 2, 3]
     - expect:
@@ -136,7 +136,7 @@ transcript:
 name: garbage_collection
 window_fn: FixedWindows(10)
 trigger_fn: AfterCount(2)
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
 allowed_lateness: 10
 accumulation_mode: discarding
 transcript:
@@ -153,7 +153,7 @@ transcript:
 name: known_late_data_watermark
 window_fn: FixedWindows(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
 transcript:
   - watermark: 5
   - input: [2, 3, 7, 8]
@@ -165,7 +165,7 @@ transcript:
 name: known_late_data_no_watermark_hold_possible
 window_fn: FixedWindows(10)
 trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
 transcript:
   - watermark: 8
   - input: [2, 3, 7]

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 9c4b109..44a5a26 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -67,7 +67,7 @@ from apache_beam.utils import urns
 
 # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
 # behavior.
-class OutputTimeFn(object):
+class TimestampCombiner(object):
   """Determines how output timestamps of grouping operations are assigned."""
 
   OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
@@ -77,17 +77,17 @@ class OutputTimeFn(object):
   OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
 
   @staticmethod
-  def get_impl(output_time_fn, window_fn):
-    if output_time_fn == OutputTimeFn.OUTPUT_AT_EOW:
+  def get_impl(timestamp_combiner, window_fn):
+    if timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW:
       return timeutil.OutputAtEndOfWindowImpl()
-    elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST:
+    elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST:
       return timeutil.OutputAtEarliestInputTimestampImpl()
-    elif output_time_fn == OutputTimeFn.OUTPUT_AT_LATEST:
+    elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_LATEST:
       return timeutil.OutputAtLatestInputTimestampImpl()
-    elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED:
+    elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED:
       return timeutil.OutputAtEarliestTransformedInputTimestampImpl(window_fn)
     else:
-      raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn)
+      raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
 
 
 class WindowFn(urns.RunnerApiFn):
@@ -132,10 +132,10 @@ class WindowFn(urns.RunnerApiFn):
   def get_transformed_output_time(self, window, input_timestamp):  # pylint: disable=unused-argument
     """Given input time and output window, returns output time for window.
 
-    If OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the Windowing,
-    the output timestamp for the given window will be the earliest of the
-    timestamps returned by get_transformed_output_time() for elements of the
-    window.
+    If TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the
+    Windowing, the output timestamp for the given window will be the earliest
+    of the timestamps returned by get_transformed_output_time() for elements
+    of the window.
 
     Arguments:
       window: Output window of element.

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 38a2df8..0f613d7 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -38,7 +38,7 @@ from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
 from apache_beam.transforms.window import Sessions
 from apache_beam.transforms.window import SlidingWindows
 from apache_beam.transforms.window import TimestampedValue
@@ -271,7 +271,7 @@ class RunnerApiTest(unittest.TestCase):
         Windowing(FixedWindows(1, 3), AfterCount(6),
                   accumulation_mode=AccumulationMode.ACCUMULATING),
         Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
-                  output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
+                  timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
                   accumulation_mode=AccumulationMode.DISCARDING)):
       context = pipeline_context.PipelineContext()
       self.assertEqual(