You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/17 21:46:42 UTC
[11/50] [abbrv] beam git commit: Runner API translation of triggers
and windowing strategies.
Runner API translation of triggers and windowing strategies.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b86e1fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b86e1fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b86e1fc
Branch: refs/heads/gearpump-runner
Commit: 5b86e1fc22234a7a6dd00696326fa0fae8fe7a2d
Parents: aad32b7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 16:18:02 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 20 +++
sdks/python/apache_beam/pipeline.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 38 +++++
sdks/python/apache_beam/transforms/trigger.py | 143 ++++++++++++++++++-
.../apache_beam/transforms/trigger_test.py | 33 +++++
sdks/python/apache_beam/transforms/window.py | 34 +++--
.../apache_beam/transforms/window_test.py | 23 ++-
7 files changed, 272 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 1d29f32..fd72af8 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -22,6 +22,8 @@ import cPickle as pickle
import google.protobuf
from apache_beam.coders import coder_impl
+from apache_beam.utils import urns
+from apache_beam.utils import proto_utils
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
@@ -182,6 +184,24 @@ class Coder(object):
and self._dict_without_impl() == other._dict_without_impl())
# pylint: enable=protected-access
+ def to_runner_api(self, context):
+ # TODO(BEAM-115): Use specialized URNs and components.
+ from apache_beam.runners.api import beam_runner_api_pb2
+ return beam_runner_api_pb2.Coder(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ spec=beam_runner_api_pb2.UrnWithParameter(
+ urn=urns.PICKLED_CODER,
+ parameter=proto_utils.pack_Any(
+ google.protobuf.wrappers_pb2.BytesValue(
+ value=serialize_coder(self))))))
+
+ @staticmethod
+ def from_runner_api(proto, context):
+ any_proto = proto.spec.spec.parameter
+ bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
+ any_proto.Unpack(bytes_proto)
+ return deserialize_coder(bytes_proto.value)
+
class StrUtf8Coder(Coder):
"""A coder used for reading and writing strings as UTF-8."""
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 4ec2e47..9edcf9b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -499,6 +499,6 @@ class PipelineContext(object):
def to_runner_api(self):
context_proto = beam_runner_api_pb2.Components()
- for name, cls in self._COMPONENT_TYEPS:
+ for name, cls in self._COMPONENT_TYPES:
getattr(self, name).populate_map(getattr(context_proto, name))
return context_proto
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 7abd784..1fc63b2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -27,6 +27,7 @@ from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.coders import typecoders
from apache_beam.internal import util
+from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
@@ -49,6 +50,7 @@ from apache_beam.typehints import WithTypeHints
from apache_beam.typehints.trivial_inference import element_type
from apache_beam.utils.pipeline_options import TypeOptions
+
# Type variables
T = typehints.TypeVariable('T')
K = typehints.TypeVariable('K')
@@ -1207,9 +1209,45 @@ class Windowing(object):
self.accumulation_mode,
self.output_time_fn)
+ def __eq__(self, other):
+ if type(self) == type(other):
+ if self._is_default and other._is_default:
+ return True
+ else:
+ return (
+ self.windowfn == other.windowfn
+ and self.triggerfn == other.triggerfn
+ and self.accumulation_mode == other.accumulation_mode
+ and self.output_time_fn == other.output_time_fn)
+
def is_default(self):
return self._is_default
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.WindowingStrategy(
+ window_fn=self.windowfn.to_runner_api(context),
+ # TODO(robertwb): Prohibit implicit multi-level merging.
+ merge_status=(beam_runner_api_pb2.NEEDS_MERGE
+ if self.windowfn.is_merging()
+ else beam_runner_api_pb2.NON_MERGING),
+ window_coder_id=context.coders.get_id(
+ self.windowfn.get_window_coder()),
+ trigger=self.triggerfn.to_runner_api(context),
+ accumulation_mode=self.accumulation_mode,
+ output_time=self.output_time_fn,
+ closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
+ allowed_lateness=0)
+
+ @staticmethod
+ def from_runner_api(proto, context):
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam.transforms.trigger import TriggerFn
+ return Windowing(
+ windowfn=WindowFn.from_runner_api(proto.window_fn, context),
+ triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
+ accumulation_mode=proto.accumulation_mode,
+ output_time_fn=proto.output_time)
+
@typehints.with_input_types(T)
@typehints.with_output_types(T)
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 04198ba..b55d602 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -35,13 +35,14 @@ from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import OutputTimeFn
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
+from apache_beam.runners.api import beam_runner_api_pb2
class AccumulationMode(object):
"""Controls what to do with data when a trigger fires multiple times.
"""
- DISCARDING = 1
- ACCUMULATING = 2
+ DISCARDING = beam_runner_api_pb2.DISCARDING
+ ACCUMULATING = beam_runner_api_pb2.ACCUMULATING
# TODO(robertwb): Provide retractions of previous outputs.
# RETRACTING = 3
@@ -185,6 +186,26 @@ class TriggerFn(object):
pass
# pylint: enable=unused-argument
+ @staticmethod
+ def from_runner_api(proto, context):
+ return {
+ 'after_all': AfterAll,
+ 'after_any': AfterFirst,
+ 'after_each': AfterEach,
+ 'after_end_of_widow': AfterWatermark,
+ # after_processing_time, after_synchronized_processing_time
+ # always
+ 'default': DefaultTrigger,
+ 'element_count': AfterCount,
+ # never
+ 'or_finally': OrFinally,
+ 'repeat': Repeatedly,
+ }[proto.WhichOneof('trigger')].from_runner_api(proto, context)
+
+ @abstractmethod
+ def to_runner_api(self, unused_context):
+ pass
+
class DefaultTrigger(TriggerFn):
"""Semantically Repeatedly(AfterWatermark()), but more optimized."""
@@ -216,6 +237,14 @@ class DefaultTrigger(TriggerFn):
def __eq__(self, other):
return type(self) == type(other)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return DefaultTrigger()
+
+ def to_runner_api(self, unused_context):
+ return beam_runner_api_pb2.Trigger(
+ default=beam_runner_api_pb2.Trigger.Default())
+
class AfterWatermark(TriggerFn):
"""Fire exactly once when the watermark passes the end of the window.
@@ -235,9 +264,9 @@ class AfterWatermark(TriggerFn):
def __repr__(self):
qualifiers = []
if self.early:
- qualifiers.append('early=%s' % self.early)
+ qualifiers.append('early=%s' % self.early.underlying)
if self.late:
- qualifiers.append('late=%s' % self.late)
+ qualifiers.append('late=%s' % self.late.underlying)
return 'AfterWatermark(%s)' % ', '.join(qualifiers)
def is_late(self, context):
@@ -305,6 +334,28 @@ class AfterWatermark(TriggerFn):
def __hash__(self):
return hash((type(self), self.early, self.late))
+ @staticmethod
+ def from_runner_api(proto, context):
+ return AfterWatermark(
+ early=TriggerFn.from_runner_api(
+ proto.after_end_of_widow.early_firings, context)
+ if proto.after_end_of_widow.HasField('early_firings')
+ else None,
+ late=TriggerFn.from_runner_api(
+ proto.after_end_of_widow.late_firings, context)
+ if proto.after_end_of_widow.HasField('late_firings')
+ else None)
+
+ def to_runner_api(self, context):
+ early_proto = self.early.underlying.to_runner_api(
+ context) if self.early else None
+ late_proto = self.late.underlying.to_runner_api(
+ context) if self.late else None
+ return beam_runner_api_pb2.Trigger(
+ after_end_of_widow=beam_runner_api_pb2.Trigger.AfterEndOfWindow(
+ early_firings=early_proto,
+ late_firings=late_proto))
+
class AfterCount(TriggerFn):
"""Fire when there are at least count elements in this window pane."""
@@ -317,6 +368,9 @@ class AfterCount(TriggerFn):
def __repr__(self):
return 'AfterCount(%s)' % self.count
+ def __eq__(self, other):
+ return type(self) == type(other) and self.count == other.count
+
def on_element(self, element, window, context):
context.add_state(self.COUNT_TAG, 1)
@@ -333,6 +387,15 @@ class AfterCount(TriggerFn):
def reset(self, window, context):
context.clear_state(self.COUNT_TAG)
+ @staticmethod
+ def from_runner_api(proto, unused_context):
+ return AfterCount(proto.element_count.element_count)
+
+ def to_runner_api(self, unused_context):
+ return beam_runner_api_pb2.Trigger(
+ element_count=beam_runner_api_pb2.Trigger.ElementCount(
+ element_count=self.count))
+
class Repeatedly(TriggerFn):
"""Repeatedly invoke the given trigger, never finishing."""
@@ -343,6 +406,9 @@ class Repeatedly(TriggerFn):
def __repr__(self):
return 'Repeatedly(%s)' % self.underlying
+ def __eq__(self, other):
+ return type(self) == type(other) and self.underlying == other.underlying
+
def on_element(self, element, window, context): # get window from context?
self.underlying.on_element(element, window, context)
@@ -360,6 +426,16 @@ class Repeatedly(TriggerFn):
def reset(self, window, context):
self.underlying.reset(window, context)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return Repeatedly(
+ TriggerFn.from_runner_api(proto.repeat.subtrigger, context))
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.Trigger(
+ repeat=beam_runner_api_pb2.Trigger.Repeat(
+ subtrigger=self.underlying.to_runner_api(context)))
+
class ParallelTriggerFn(TriggerFn):
@@ -372,6 +448,9 @@ class ParallelTriggerFn(TriggerFn):
return '%s(%s)' % (self.__class__.__name__,
', '.join(str(t) for t in self.triggers))
+ def __eq__(self, other):
+ return type(self) == type(other) and self.triggers == other.triggers
+
@abstractmethod
def combine_op(self, trigger_results):
pass
@@ -406,6 +485,31 @@ class ParallelTriggerFn(TriggerFn):
def _sub_context(context, index):
return NestedContext(context, '%d/' % index)
+ @staticmethod
+ def from_runner_api(proto, context):
+ subtriggers = [
+ TriggerFn.from_runner_api(subtrigger, context)
+ for subtrigger
+ in proto.after_all.subtriggers or proto.after_any.subtriggers]
+ if proto.after_all.subtriggers:
+ return AfterAll(*subtriggers)
+ else:
+ return AfterFirst(*subtriggers)
+
+ def to_runner_api(self, context):
+ subtriggers = [
+ subtrigger.to_runner_api(context) for subtrigger in self.triggers]
+ if self.combine_op == all:
+ return beam_runner_api_pb2.Trigger(
+ after_all=beam_runner_api_pb2.Trigger.AfterAll(
+ subtriggers=subtriggers))
+ elif self.combine_op == any:
+ return beam_runner_api_pb2.Trigger(
+ after_any=beam_runner_api_pb2.Trigger.AfterAny(
+ subtriggers=subtriggers))
+ else:
+ raise NotImplementedError(self)
+
class AfterFirst(ParallelTriggerFn):
"""Fires when any subtrigger fires.
@@ -435,6 +539,9 @@ class AfterEach(TriggerFn):
return '%s(%s)' % (self.__class__.__name__,
', '.join(str(t) for t in self.triggers))
+ def __eq__(self, other):
+ return type(self) == type(other) and self.triggers == other.triggers
+
def on_element(self, element, window, context):
ix = context.get_state(self.INDEX_TAG)
if ix < len(self.triggers):
@@ -474,12 +581,40 @@ class AfterEach(TriggerFn):
def _sub_context(context, index):
return NestedContext(context, '%d/' % index)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return AfterEach(*[
+ TriggerFn.from_runner_api(subtrigger, context)
+ for subtrigger in proto.after_each.subtriggers])
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.Trigger(
+ after_each=beam_runner_api_pb2.Trigger.AfterEach(
+ subtriggers=[
+ subtrigger.to_runner_api(context)
+ for subtrigger in self.triggers]))
+
class OrFinally(AfterFirst):
def __init__(self, body_trigger, exit_trigger):
super(OrFinally, self).__init__(body_trigger, exit_trigger)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return OrFinally(
+ TriggerFn.from_runner_api(proto.or_finally.main, context),
+ # getattr is used as finally is a keyword in Python
+ TriggerFn.from_runner_api(getattr(proto.or_finally, 'finally'),
+ context))
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.Trigger(
+ or_finally=beam_runner_api_pb2.Trigger.OrFinally(
+ main=self.triggers[0].to_runner_api(context),
+ # dict keyword argument is used as finally is a keyword in Python
+ **{'finally': self.triggers[1].to_runner_api(context)}))
+
class TriggerContext(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 72bab2e..cc9e0f5 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -38,6 +38,7 @@ from apache_beam.transforms.trigger import DefaultTrigger
from apache_beam.transforms.trigger import GeneralTriggerDriver
from apache_beam.transforms.trigger import InMemoryUnmergedState
from apache_beam.transforms.trigger import Repeatedly
+from apache_beam.transforms.trigger import TriggerFn
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import IntervalWindow
@@ -380,6 +381,38 @@ class TriggerTest(unittest.TestCase):
range(10))
+class RunnerApiTest(unittest.TestCase):
+
+ def test_trigger_encoding(self):
+ for trigger_fn in (
+ DefaultTrigger(),
+ AfterAll(AfterCount(1), AfterCount(10)),
+ AfterFirst(AfterCount(10), AfterCount(100)),
+ AfterWatermark(early=AfterCount(1000)),
+ AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+ Repeatedly(AfterCount(100)),
+ trigger.OrFinally(AfterCount(3), AfterCount(10))):
+ context = beam.pipeline.PipelineContext()
+ self.assertEqual(
+ trigger_fn,
+ TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+ def test_windowing_strategy_encoding(self):
+ for trigger_fn in (
+ DefaultTrigger(),
+ AfterAll(AfterCount(1), AfterCount(10)),
+ AfterFirst(AfterCount(10), AfterCount(100)),
+ AfterEach(AfterCount(100), AfterCount(1000)),
+ AfterWatermark(early=AfterCount(1000)),
+ AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+ Repeatedly(AfterCount(100)),
+ trigger.OrFinally(AfterCount(3), AfterCount(10))):
+ context = beam.pipeline.PipelineContext()
+ self.assertEqual(
+ trigger_fn,
+ TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+
class TriggerPipelineTest(unittest.TestCase):
def test_after_count(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a562bcf..c763a96 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -70,9 +70,9 @@ from apache_beam.utils import urns
class OutputTimeFn(object):
"""Determines how output timestamps of grouping operations are assigned."""
- OUTPUT_AT_EOW = 'OUTPUT_AT_EOW'
- OUTPUT_AT_EARLIEST = 'OUTPUT_AT_EARLIEST'
- OUTPUT_AT_LATEST = 'OUTPUT_AT_LATEST'
+ OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
+ OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
+ OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
@staticmethod
@@ -116,6 +116,10 @@ class WindowFn(object):
"""Returns a window that is the result of merging a set of windows."""
raise NotImplementedError
+ def is_merging(self):
+ """Returns whether this WindowFn merges windows."""
+ return True
+
def get_window_coder(self):
return coders.WindowCoder()
@@ -267,7 +271,16 @@ class GlobalWindow(BoundedWindow):
return self is other or type(self) is type(other)
-class GlobalWindows(WindowFn):
+class NonMergingWindowFn(WindowFn):
+
+ def is_merging(self):
+ return False
+
+ def merge(self, merge_context):
+ pass # No merging.
+
+
+class GlobalWindows(NonMergingWindowFn):
"""A windowing function that assigns everything to one global window."""
@classmethod
@@ -277,9 +290,6 @@ class GlobalWindows(WindowFn):
def assign(self, assign_context):
return [GlobalWindow()]
- def merge(self, merge_context):
- pass # No merging.
-
def get_window_coder(self):
return coders.GlobalWindowCoder()
@@ -304,7 +314,7 @@ WindowFn.register_urn(
urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
-class FixedWindows(WindowFn):
+class FixedWindows(NonMergingWindowFn):
"""A windowing function that assigns each element to one time interval.
The attributes size and offset determine in what time interval a timestamp
@@ -329,9 +339,6 @@ class FixedWindows(WindowFn):
start = timestamp - (timestamp - self.offset) % self.size
return [IntervalWindow(start, start + self.size)]
- def merge(self, merge_context):
- pass # No merging.
-
def __eq__(self, other):
if type(self) == type(other) == FixedWindows:
return self.size == other.size and self.offset == other.offset
@@ -356,7 +363,7 @@ WindowFn.register_urn(
FixedWindows.from_runner_api_parameter)
-class SlidingWindows(WindowFn):
+class SlidingWindows(NonMergingWindowFn):
"""A windowing function that assigns each element to a set of sliding windows.
The attributes size and offset determine in what time interval a timestamp
@@ -384,9 +391,6 @@ class SlidingWindows(WindowFn):
return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
for s in range(start, start - self.size, -self.period)]
- def merge(self, merge_context):
- pass # No merging.
-
def __eq__(self, other):
if type(self) == type(other) == SlidingWindows:
return (self.size == other.size
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 821b143..c79739a 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -28,13 +28,17 @@ from apache_beam.transforms import Create
from apache_beam.transforms import GroupByKey
from apache_beam.transforms import Map
from apache_beam.transforms import WindowInto
+from apache_beam.transforms.core import Windowing
from apache_beam.transforms.timeutil import MAX_TIMESTAMP
from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms.window import OutputTimeFn
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
@@ -226,7 +230,10 @@ class WindowTest(unittest.TestCase):
label='assert:mean')
p.run()
- def test_runner_api(self):
+
+class RunnerApiTest(unittest.TestCase):
+
+ def test_windowfn_encoding(self):
for window_fn in (GlobalWindows(),
FixedWindows(37),
SlidingWindows(2, 389),
@@ -236,5 +243,19 @@ class WindowTest(unittest.TestCase):
window_fn,
WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
+ def test_windowing_encoding(self):
+ for windowing in (
+ Windowing(GlobalWindows()),
+ Windowing(FixedWindows(1, 3), AfterCount(6),
+ accumulation_mode=AccumulationMode.ACCUMULATING),
+ Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
+ output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
+ accumulation_mode=AccumulationMode.DISCARDING)):
+ context = pipeline.PipelineContext()
+ self.assertEqual(
+ windowing,
+ Windowing.from_runner_api(windowing.to_runner_api(context), context))
+
+
if __name__ == '__main__':
unittest.main()