You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/10 04:29:38 UTC

[6/9] beam git commit: Runner API translation of triggers and windowing strategies.

Runner API translation of triggers and windowing strategies.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b86e1fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b86e1fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b86e1fc

Branch: refs/heads/master
Commit: 5b86e1fc22234a7a6dd00696326fa0fae8fe7a2d
Parents: aad32b7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 16:18:02 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        |  20 +++
 sdks/python/apache_beam/pipeline.py             |   2 +-
 sdks/python/apache_beam/transforms/core.py      |  38 +++++
 sdks/python/apache_beam/transforms/trigger.py   | 143 ++++++++++++++++++-
 .../apache_beam/transforms/trigger_test.py      |  33 +++++
 sdks/python/apache_beam/transforms/window.py    |  34 +++--
 .../apache_beam/transforms/window_test.py       |  23 ++-
 7 files changed, 272 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 1d29f32..fd72af8 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -22,6 +22,8 @@ import cPickle as pickle
 import google.protobuf
 
 from apache_beam.coders import coder_impl
+from apache_beam.utils import urns
+from apache_beam.utils import proto_utils
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
@@ -182,6 +184,24 @@ class Coder(object):
             and self._dict_without_impl() == other._dict_without_impl())
     # pylint: enable=protected-access
 
+  def to_runner_api(self, context):
+    # TODO(BEAM-115): Use specialized URNs and components.
+    from apache_beam.runners.api import beam_runner_api_pb2
+    return beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(
+            spec=beam_runner_api_pb2.UrnWithParameter(
+                urn=urns.PICKLED_CODER,
+                parameter=proto_utils.pack_Any(
+                    google.protobuf.wrappers_pb2.BytesValue(
+                        value=serialize_coder(self))))))
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    any_proto = proto.spec.spec.parameter
+    bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
+    any_proto.Unpack(bytes_proto)
+    return deserialize_coder(bytes_proto.value)
+
 
 class StrUtf8Coder(Coder):
   """A coder used for reading and writing strings as UTF-8."""

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 4ec2e47..9edcf9b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -499,6 +499,6 @@ class PipelineContext(object):
 
   def to_runner_api(self):
     context_proto = beam_runner_api_pb2.Components()
-    for name, cls in self._COMPONENT_TYEPS:
+    for name, cls in self._COMPONENT_TYPES:
       getattr(self, name).populate_map(getattr(context_proto, name))
     return context_proto

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 7abd784..1fc63b2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -27,6 +27,7 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.coders import typecoders
 from apache_beam.internal import util
+from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import ptransform
 from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
@@ -49,6 +50,7 @@ from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints.trivial_inference import element_type
 from apache_beam.utils.pipeline_options import TypeOptions
 
+
 # Type variables
 T = typehints.TypeVariable('T')
 K = typehints.TypeVariable('K')
@@ -1207,9 +1209,45 @@ class Windowing(object):
                                           self.accumulation_mode,
                                           self.output_time_fn)
 
+  def __eq__(self, other):
+    if type(self) == type(other):
+      if self._is_default and other._is_default:
+        return True
+      else:
+        return (
+            self.windowfn == other.windowfn
+            and self.triggerfn == other.triggerfn
+            and self.accumulation_mode == other.accumulation_mode
+            and self.output_time_fn == other.output_time_fn)
+
   def is_default(self):
     return self._is_default
 
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.WindowingStrategy(
+        window_fn=self.windowfn.to_runner_api(context),
+        # TODO(robertwb): Prohibit implicit multi-level merging.
+        merge_status=(beam_runner_api_pb2.NEEDS_MERGE
+                      if self.windowfn.is_merging()
+                      else beam_runner_api_pb2.NON_MERGING),
+        window_coder_id=context.coders.get_id(
+            self.windowfn.get_window_coder()),
+        trigger=self.triggerfn.to_runner_api(context),
+        accumulation_mode=self.accumulation_mode,
+        output_time=self.output_time_fn,
+        closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
+        allowed_lateness=0)
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.transforms.trigger import TriggerFn
+    return Windowing(
+        windowfn=WindowFn.from_runner_api(proto.window_fn, context),
+        triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
+        accumulation_mode=proto.accumulation_mode,
+        output_time_fn=proto.output_time)
+
 
 @typehints.with_input_types(T)
 @typehints.with_output_types(T)

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 04198ba..b55d602 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -35,13 +35,14 @@ from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import OutputTimeFn
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
+from apache_beam.runners.api import beam_runner_api_pb2
 
 
 class AccumulationMode(object):
   """Controls what to do with data when a trigger fires multiple times.
   """
-  DISCARDING = 1
-  ACCUMULATING = 2
+  DISCARDING = beam_runner_api_pb2.DISCARDING
+  ACCUMULATING = beam_runner_api_pb2.ACCUMULATING
   # TODO(robertwb): Provide retractions of previous outputs.
   # RETRACTING = 3
 
@@ -185,6 +186,26 @@ class TriggerFn(object):
     pass
 # pylint: enable=unused-argument
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return {
+        'after_all': AfterAll,
+        'after_any': AfterFirst,
+        'after_each': AfterEach,
+        'after_end_of_widow': AfterWatermark,
+        # after_processing_time, after_synchronized_processing_time
+        # always
+        'default': DefaultTrigger,
+        'element_count': AfterCount,
+        # never
+        'or_finally': OrFinally,
+        'repeat': Repeatedly,
+    }[proto.WhichOneof('trigger')].from_runner_api(proto, context)
+
+  @abstractmethod
+  def to_runner_api(self, unused_context):
+    pass
+
 
 class DefaultTrigger(TriggerFn):
   """Semantically Repeatedly(AfterWatermark()), but more optimized."""
@@ -216,6 +237,14 @@ class DefaultTrigger(TriggerFn):
   def __eq__(self, other):
     return type(self) == type(other)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return DefaultTrigger()
+
+  def to_runner_api(self, unused_context):
+    return beam_runner_api_pb2.Trigger(
+        default=beam_runner_api_pb2.Trigger.Default())
+
 
 class AfterWatermark(TriggerFn):
   """Fire exactly once when the watermark passes the end of the window.
@@ -235,9 +264,9 @@ class AfterWatermark(TriggerFn):
   def __repr__(self):
     qualifiers = []
     if self.early:
-      qualifiers.append('early=%s' % self.early)
+      qualifiers.append('early=%s' % self.early.underlying)
     if self.late:
-      qualifiers.append('late=%s' % self.late)
+      qualifiers.append('late=%s' % self.late.underlying)
     return 'AfterWatermark(%s)' % ', '.join(qualifiers)
 
   def is_late(self, context):
@@ -305,6 +334,28 @@ class AfterWatermark(TriggerFn):
   def __hash__(self):
     return hash((type(self), self.early, self.late))
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return AfterWatermark(
+        early=TriggerFn.from_runner_api(
+            proto.after_end_of_widow.early_firings, context)
+        if proto.after_end_of_widow.HasField('early_firings')
+        else None,
+        late=TriggerFn.from_runner_api(
+            proto.after_end_of_widow.late_firings, context)
+        if proto.after_end_of_widow.HasField('late_firings')
+        else None)
+
+  def to_runner_api(self, context):
+    early_proto = self.early.underlying.to_runner_api(
+        context) if self.early else None
+    late_proto = self.late.underlying.to_runner_api(
+        context) if self.late else None
+    return beam_runner_api_pb2.Trigger(
+        after_end_of_widow=beam_runner_api_pb2.Trigger.AfterEndOfWindow(
+            early_firings=early_proto,
+            late_firings=late_proto))
+
 
 class AfterCount(TriggerFn):
   """Fire when there are at least count elements in this window pane."""
@@ -317,6 +368,9 @@ class AfterCount(TriggerFn):
   def __repr__(self):
     return 'AfterCount(%s)' % self.count
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.count == other.count
+
   def on_element(self, element, window, context):
     context.add_state(self.COUNT_TAG, 1)
 
@@ -333,6 +387,15 @@ class AfterCount(TriggerFn):
   def reset(self, window, context):
     context.clear_state(self.COUNT_TAG)
 
+  @staticmethod
+  def from_runner_api(proto, unused_context):
+    return AfterCount(proto.element_count.element_count)
+
+  def to_runner_api(self, unused_context):
+    return beam_runner_api_pb2.Trigger(
+        element_count=beam_runner_api_pb2.Trigger.ElementCount(
+            element_count=self.count))
+
 
 class Repeatedly(TriggerFn):
   """Repeatedly invoke the given trigger, never finishing."""
@@ -343,6 +406,9 @@ class Repeatedly(TriggerFn):
   def __repr__(self):
     return 'Repeatedly(%s)' % self.underlying
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.underlying == other.underlying
+
   def on_element(self, element, window, context):  # get window from context?
     self.underlying.on_element(element, window, context)
 
@@ -360,6 +426,16 @@ class Repeatedly(TriggerFn):
   def reset(self, window, context):
     self.underlying.reset(window, context)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return Repeatedly(
+        TriggerFn.from_runner_api(proto.repeat.subtrigger, context))
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.Trigger(
+        repeat=beam_runner_api_pb2.Trigger.Repeat(
+            subtrigger=self.underlying.to_runner_api(context)))
+
 
 class ParallelTriggerFn(TriggerFn):
 
@@ -372,6 +448,9 @@ class ParallelTriggerFn(TriggerFn):
     return '%s(%s)' % (self.__class__.__name__,
                        ', '.join(str(t) for t in self.triggers))
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.triggers == other.triggers
+
   @abstractmethod
   def combine_op(self, trigger_results):
     pass
@@ -406,6 +485,31 @@ class ParallelTriggerFn(TriggerFn):
   def _sub_context(context, index):
     return NestedContext(context, '%d/' % index)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    subtriggers = [
+        TriggerFn.from_runner_api(subtrigger, context)
+        for subtrigger
+        in proto.after_all.subtriggers or proto.after_any.subtriggers]
+    if proto.after_all.subtriggers:
+      return AfterAll(*subtriggers)
+    else:
+      return AfterFirst(*subtriggers)
+
+  def to_runner_api(self, context):
+    subtriggers = [
+        subtrigger.to_runner_api(context) for subtrigger in self.triggers]
+    if self.combine_op == all:
+      return beam_runner_api_pb2.Trigger(
+          after_all=beam_runner_api_pb2.Trigger.AfterAll(
+              subtriggers=subtriggers))
+    elif self.combine_op == any:
+      return beam_runner_api_pb2.Trigger(
+          after_any=beam_runner_api_pb2.Trigger.AfterAny(
+              subtriggers=subtriggers))
+    else:
+      raise NotImplementedError(self)
+
 
 class AfterFirst(ParallelTriggerFn):
   """Fires when any subtrigger fires.
@@ -435,6 +539,9 @@ class AfterEach(TriggerFn):
     return '%s(%s)' % (self.__class__.__name__,
                        ', '.join(str(t) for t in self.triggers))
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.triggers == other.triggers
+
   def on_element(self, element, window, context):
     ix = context.get_state(self.INDEX_TAG)
     if ix < len(self.triggers):
@@ -474,12 +581,40 @@ class AfterEach(TriggerFn):
   def _sub_context(context, index):
     return NestedContext(context, '%d/' % index)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return AfterEach(*[
+        TriggerFn.from_runner_api(subtrigger, context)
+        for subtrigger in proto.after_each.subtriggers])
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.Trigger(
+        after_each=beam_runner_api_pb2.Trigger.AfterEach(
+            subtriggers=[
+                subtrigger.to_runner_api(context)
+                for subtrigger in self.triggers]))
+
 
 class OrFinally(AfterFirst):
 
   def __init__(self, body_trigger, exit_trigger):
     super(OrFinally, self).__init__(body_trigger, exit_trigger)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return OrFinally(
+        TriggerFn.from_runner_api(proto.or_finally.main, context),
+        # getattr is used as finally is a keyword in Python
+        TriggerFn.from_runner_api(getattr(proto.or_finally, 'finally'),
+                                  context))
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.Trigger(
+        or_finally=beam_runner_api_pb2.Trigger.OrFinally(
+            main=self.triggers[0].to_runner_api(context),
+            # dict keyword argument is used as finally is a keyword in Python
+            **{'finally': self.triggers[1].to_runner_api(context)}))
+
 
 class TriggerContext(object):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 72bab2e..cc9e0f5 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -38,6 +38,7 @@ from apache_beam.transforms.trigger import DefaultTrigger
 from apache_beam.transforms.trigger import GeneralTriggerDriver
 from apache_beam.transforms.trigger import InMemoryUnmergedState
 from apache_beam.transforms.trigger import Repeatedly
+from apache_beam.transforms.trigger import TriggerFn
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import IntervalWindow
@@ -380,6 +381,38 @@ class TriggerTest(unittest.TestCase):
                        range(10))
 
 
+class RunnerApiTest(unittest.TestCase):
+
+  def test_trigger_encoding(self):
+    for trigger_fn in (
+        DefaultTrigger(),
+        AfterAll(AfterCount(1), AfterCount(10)),
+        AfterFirst(AfterCount(10), AfterCount(100)),
+        AfterWatermark(early=AfterCount(1000)),
+        AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+        Repeatedly(AfterCount(100)),
+        trigger.OrFinally(AfterCount(3), AfterCount(10))):
+      context = beam.pipeline.PipelineContext()
+      self.assertEqual(
+          trigger_fn,
+          TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+  def test_windowing_strategy_encoding(self):
+    for trigger_fn in (
+        DefaultTrigger(),
+        AfterAll(AfterCount(1), AfterCount(10)),
+        AfterFirst(AfterCount(10), AfterCount(100)),
+        AfterEach(AfterCount(100), AfterCount(1000)),
+        AfterWatermark(early=AfterCount(1000)),
+        AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+        Repeatedly(AfterCount(100)),
+        trigger.OrFinally(AfterCount(3), AfterCount(10))):
+      context = beam.pipeline.PipelineContext()
+      self.assertEqual(
+          trigger_fn,
+          TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+
 class TriggerPipelineTest(unittest.TestCase):
 
   def test_after_count(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a562bcf..c763a96 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -70,9 +70,9 @@ from apache_beam.utils import urns
 class OutputTimeFn(object):
   """Determines how output timestamps of grouping operations are assigned."""
 
-  OUTPUT_AT_EOW = 'OUTPUT_AT_EOW'
-  OUTPUT_AT_EARLIEST = 'OUTPUT_AT_EARLIEST'
-  OUTPUT_AT_LATEST = 'OUTPUT_AT_LATEST'
+  OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
+  OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
+  OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
   OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
 
   @staticmethod
@@ -116,6 +116,10 @@ class WindowFn(object):
     """Returns a window that is the result of merging a set of windows."""
     raise NotImplementedError
 
+  def is_merging(self):
+    """Returns whether this WindowFn merges windows."""
+    return True
+
   def get_window_coder(self):
     return coders.WindowCoder()
 
@@ -267,7 +271,16 @@ class GlobalWindow(BoundedWindow):
     return self is other or type(self) is type(other)
 
 
-class GlobalWindows(WindowFn):
+class NonMergingWindowFn(WindowFn):
+
+  def is_merging(self):
+    return False
+
+  def merge(self, merge_context):
+    pass  # No merging.
+
+
+class GlobalWindows(NonMergingWindowFn):
   """A windowing function that assigns everything to one global window."""
 
   @classmethod
@@ -277,9 +290,6 @@ class GlobalWindows(WindowFn):
   def assign(self, assign_context):
     return [GlobalWindow()]
 
-  def merge(self, merge_context):
-    pass  # No merging.
-
   def get_window_coder(self):
     return coders.GlobalWindowCoder()
 
@@ -304,7 +314,7 @@ WindowFn.register_urn(
     urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
 
 
-class FixedWindows(WindowFn):
+class FixedWindows(NonMergingWindowFn):
   """A windowing function that assigns each element to one time interval.
 
   The attributes size and offset determine in what time interval a timestamp
@@ -329,9 +339,6 @@ class FixedWindows(WindowFn):
     start = timestamp - (timestamp - self.offset) % self.size
     return [IntervalWindow(start, start + self.size)]
 
-  def merge(self, merge_context):
-    pass  # No merging.
-
   def __eq__(self, other):
     if type(self) == type(other) == FixedWindows:
       return self.size == other.size and self.offset == other.offset
@@ -356,7 +363,7 @@ WindowFn.register_urn(
     FixedWindows.from_runner_api_parameter)
 
 
-class SlidingWindows(WindowFn):
+class SlidingWindows(NonMergingWindowFn):
   """A windowing function that assigns each element to a set of sliding windows.
 
   The attributes size and offset determine in what time interval a timestamp
@@ -384,9 +391,6 @@ class SlidingWindows(WindowFn):
     return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
             for s in range(start, start - self.size, -self.period)]
 
-  def merge(self, merge_context):
-    pass  # No merging.
-
   def __eq__(self, other):
     if type(self) == type(other) == SlidingWindows:
       return (self.size == other.size

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 821b143..c79739a 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -28,13 +28,17 @@ from apache_beam.transforms import Create
 from apache_beam.transforms import GroupByKey
 from apache_beam.transforms import Map
 from apache_beam.transforms import WindowInto
+from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.timeutil import MAX_TIMESTAMP
 from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import AfterCount
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms.window import OutputTimeFn
 from apache_beam.transforms.window import Sessions
 from apache_beam.transforms.window import SlidingWindows
 from apache_beam.transforms.window import TimestampedValue
@@ -226,7 +230,10 @@ class WindowTest(unittest.TestCase):
                 label='assert:mean')
     p.run()
 
-  def test_runner_api(self):
+
+class RunnerApiTest(unittest.TestCase):
+
+  def test_windowfn_encoding(self):
     for window_fn in (GlobalWindows(),
                       FixedWindows(37),
                       SlidingWindows(2, 389),
@@ -236,5 +243,19 @@ class WindowTest(unittest.TestCase):
           window_fn,
           WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
 
+  def test_windowing_encoding(self):
+    for windowing in (
+        Windowing(GlobalWindows()),
+        Windowing(FixedWindows(1, 3), AfterCount(6),
+                  accumulation_mode=AccumulationMode.ACCUMULATING),
+        Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
+                  output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
+                  accumulation_mode=AccumulationMode.DISCARDING)):
+      context = pipeline.PipelineContext()
+      self.assertEqual(
+          windowing,
+          Windowing.from_runner_api(windowing.to_runner_api(context), context))
+
+
 if __name__ == '__main__':
   unittest.main()