You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/10 04:29:33 UTC

[1/9] beam git commit: Runner API context helper classes.

Repository: beam
Updated Branches:
  refs/heads/master f13a84d67 -> 2c2424cb4


Runner API context helper classes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc76a186
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc76a186
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc76a186

Branch: refs/heads/master
Commit: bc76a186099568ef292ceb007388ae7174150bc2
Parents: 3bb125e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 12:04:27 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:00 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py | 62 ++++++++++++++++++++++++++++++++
 1 file changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bc76a186/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 7db39a9..4ec2e47 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,11 +52,14 @@ import os
 import shutil
 import tempfile
 
+from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
 from apache_beam.utils.pipeline_options import PipelineOptions
@@ -440,3 +443,62 @@ class AppliedPTransform(object):
         if v not in visited:
           visited.add(v)
           visitor.visit_value(v, self)
+
+
+class PipelineContextMap(object):
+  """This is a bi-directional map between objects and ids.
+
+  Under the hood it encodes and decodes these objects into runner API
+  representations.
+  """
+  def __init__(self, context, obj_type, proto_map=None):
+    self._pipeline_context = context
+    self._obj_type = obj_type
+    self._obj_to_id = {}
+    self._id_to_obj = {}
+    self._id_to_proto = proto_map if proto_map else {}
+    self._counter = 0
+
+  def _unique_ref(self):
+    self._counter += 1
+    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+  def populate_map(self, proto_map):
+    for id, obj in self._id_to_obj:
+      proto_map[id] = self._id_to_proto[id]
+
+  def get_id(self, obj):
+    if obj not in self._obj_to_id:
+      id = self._unique_ref()
+      self._id_to_obj[id] = obj
+      self._obj_to_id[obj] = id
+      self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+    return self._obj_to_id[obj]
+
+  def get_by_id(self, id):
+    if id not in self._id_to_obj:
+      self._id_to_obj[id] = self._obj_type.from_runner_api(
+        self._id_to_proto[id], self._pipeline_context)
+    return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+
+  _COMPONENT_TYPES = {
+    'transforms': AppliedPTransform,
+    'pcollections': pvalue.PCollection,
+    'coders': coders.Coder,
+    'windowing_strategies': core.Windowing,
+    # TODO: environment
+  }
+
+  def __init__(self, context_proto=None):
+    for name, cls in self._COMPONENT_TYPES.items():
+      setattr(self, name,
+              PipelineContextMap(self, cls, getattr(context_proto, name, None)))
+
+  def to_runner_api(self):
+    context_proto = beam_runner_api_pb2.Components()
+    for name, cls in self._COMPONENT_TYEPS:
+      getattr(self, name).populate_map(getattr(context_proto, name))
+    return context_proto


[7/9] beam git commit: Runner API encoding of WindowFns.

Posted by ro...@apache.org.
Runner API encoding of WindowFns.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aad32b7a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aad32b7a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aad32b7a

Branch: refs/heads/master
Commit: aad32b7a00d1aea1e7e51b68ff609d2fb3b82a8f
Parents: bc76a18
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 12:21:02 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/window.py    | 117 +++++++++++++++++++
 .../apache_beam/transforms/window_test.py       |  11 ++
 sdks/python/apache_beam/utils/proto_utils.py    |  37 ++++++
 sdks/python/apache_beam/utils/urns.py           |   7 ++
 4 files changed, 172 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 14cf2f6..a562bcf 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,13 +49,20 @@ WindowFn.
 
 from __future__ import absolute_import
 
+from google.protobuf import struct_pb2
+from google.protobuf import wrappers_pb2
+
 from apache_beam import coders
+from apache_beam.internal import pickler
+from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.transforms.timeutil import Duration
 from apache_beam.transforms.timeutil import MAX_TIMESTAMP
 from apache_beam.transforms.timeutil import MIN_TIMESTAMP
 from apache_beam.transforms.timeutil import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
 
 
 # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
@@ -131,6 +138,41 @@ class WindowFn(object):
     # By default, just return the input timestamp.
     return input_timestamp
 
+  _known_urns = {}
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, constructor):
+    cls._known_urns[urn] = parameter_type, constructor
+
+  @classmethod
+  def from_runner_api(cls, fn_proto, context):
+    parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+    return constructor(
+        proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+        context)
+
+  def to_runner_api(self, context):
+    urn, typed_param = self.to_runner_api_parameter(context)
+    return beam_runner_api_pb2.FunctionSpec(
+        spec=beam_runner_api_pb2.UrnWithParameter(
+            urn=urn,
+            parameter=proto_utils.pack_Any(typed_param)))
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return pickler.loads(fn_parameter.value)
+
+  def to_runner_api_parameter(self, context):
+    raise TypeError(self)
+    return (urns.PICKLED_WINDOW_FN,
+            wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+
+WindowFn.register_urn(
+    urns.PICKLED_WINDOW_FN,
+    wrappers_pb2.BytesValue,
+    WindowFn.from_runner_api_parameter)
+
 
 class BoundedWindow(object):
   """A window for timestamps in range (-infinity, end).
@@ -251,6 +293,16 @@ class GlobalWindows(WindowFn):
   def __ne__(self, other):
     return not self == other
 
+  @staticmethod
+  def from_runner_api_parameter(unused_fn_parameter, unused_context):
+    return GlobalWindows()
+
+  def to_runner_api_parameter(self, context):
+    return urns.GLOBAL_WINDOWS_FN, None
+
+WindowFn.register_urn(
+    urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+
 
 class FixedWindows(WindowFn):
   """A windowing function that assigns each element to one time interval.
@@ -280,6 +332,29 @@ class FixedWindows(WindowFn):
   def merge(self, merge_context):
     pass  # No merging.
 
+  def __eq__(self, other):
+    if type(self) == type(other) == FixedWindows:
+      return self.size == other.size and self.offset == other.offset
+
+  def __ne__(self, other):
+    return not self == other
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return FixedWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']))
+
+  def to_runner_api_parameter(self, context):
+    return (urns.FIXED_WINDOWS_FN,
+            proto_utils.pack_Struct(size=self.size.micros,
+                                    offset=self.offset.micros))
+
+WindowFn.register_urn(
+    urns.FIXED_WINDOWS_FN,
+    struct_pb2.Struct,
+    FixedWindows.from_runner_api_parameter)
+
 
 class SlidingWindows(WindowFn):
   """A windowing function that assigns each element to a set of sliding windows.
@@ -312,6 +387,31 @@ class SlidingWindows(WindowFn):
   def merge(self, merge_context):
     pass  # No merging.
 
+  def __eq__(self, other):
+    if type(self) == type(other) == SlidingWindows:
+      return (self.size == other.size
+              and self.offset == other.offset
+              and self.period == other.period)
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return SlidingWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']),
+        period=Duration(micros=fn_parameter['period']))
+
+  def to_runner_api_parameter(self, context):
+    return (urns.SLIDING_WINDOWS_FN,
+            proto_utils.pack_Struct(
+                size=self.size.micros,
+                offset=self.offset.micros,
+                period=self.period.micros))
+
+WindowFn.register_urn(
+    urns.SLIDING_WINDOWS_FN,
+    struct_pb2.Struct,
+    SlidingWindows.from_runner_api_parameter)
+
 
 class Sessions(WindowFn):
   """A windowing function that groups elements into sessions.
@@ -352,3 +452,20 @@ class Sessions(WindowFn):
         end = w.end
     if len(to_merge) > 1:
       merge_context.merge(to_merge, IntervalWindow(to_merge[0].start, end))
+
+  def __eq__(self, other):
+    if type(self) == type(other) == Sessions:
+      return self.gap_size == other.gap_size
+
+  @staticmethod
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
+
+  def to_runner_api_parameter(self, context):
+    return (urns.SESSION_WINDOWS_FN,
+            proto_utils.pack_Struct(gap_size=self.gap_size.micros))
+
+WindowFn.register_urn(
+    urns.SESSION_WINDOWS_FN,
+    struct_pb2.Struct,
+    Sessions.from_runner_api_parameter)

http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index c4072ac..821b143 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,6 +19,7 @@
 
 import unittest
 
+from apache_beam import pipeline
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
@@ -32,6 +33,7 @@ from apache_beam.transforms.timeutil import MIN_TIMESTAMP
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import Sessions
 from apache_beam.transforms.window import SlidingWindows
@@ -224,6 +226,15 @@ class WindowTest(unittest.TestCase):
                 label='assert:mean')
     p.run()
 
+  def test_runner_api(self):
+    for window_fn in (GlobalWindows(),
+                      FixedWindows(37),
+                      SlidingWindows(2, 389),
+                      Sessions(5077)):
+      context = pipeline.PipelineContext()
+      self.assertEqual(
+          window_fn,
+          WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
new file mode 100644
index 0000000..0ece8f5
--- /dev/null
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -0,0 +1,37 @@
+from google.protobuf import any_pb2
+from google.protobuf import struct_pb2
+
+
+def pack_Any(msg):
+  """Creates a protobuf Any with msg as its content.
+
+  Returns None if msg is None.
+  """
+  if msg is None:
+    return None
+  else:
+    result = any_pb2.Any()
+    result.Pack(msg)
+    return result
+
+
+def unpack_Any(any_msg, msg_class):
+  """Unpacks any_msg into msg_class.
+
+  Returns None if msg_class is None.
+  """
+  if msg_class is None:
+    return None
+  else:
+    msg = msg_class()
+    any_msg.Unpack(msg)
+    return msg
+
+
+def pack_Struct(**kwargs):
+  """Returns a struct containing the values indicated by kwargs.
+  """
+  msg = struct_pb2.Struct()
+  for key, value in kwargs.items():
+    msg[key] = value  # pylint: disable=unsubscriptable-object
+  return msg

http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
new file mode 100644
index 0000000..4d1c2f7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -0,0 +1,7 @@
+PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
+GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
+FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
+SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
+SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
+
+PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"


[5/9] beam git commit: Add license to new files.

Posted by ro...@apache.org.
Add license to new files.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2da21e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2da21e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2da21e2

Branch: refs/heads/master
Commit: b2da21e287660bb3077bf89e092f7aa3c385906b
Parents: 5b86e1f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Mar 8 13:47:53 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/api/__init__.py    | 16 ++++++++++++++++
 .../apache_beam/runners/api/beam_runner_api_pb2.py | 17 +++++++++++++++++
 sdks/python/apache_beam/utils/proto_utils.py       | 17 +++++++++++++++++
 sdks/python/apache_beam/utils/urns.py              | 17 +++++++++++++++++
 4 files changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/__init__.py b/sdks/python/apache_beam/runners/api/__init__.py
index e69de29..cce3aca 100644
--- a/sdks/python/apache_beam/runners/api/__init__.py
+++ b/sdks/python/apache_beam/runners/api/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
index 66c331b..f235ce8 100644
--- a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
+++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: beam_runner_api.proto
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index 0ece8f5..b4bfdca 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 from google.protobuf import any_pb2
 from google.protobuf import struct_pb2
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 4d1c2f7..186c99c 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
 GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
 FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"


[3/9] beam git commit: Auto-generated runner api proto bindings.

Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
new file mode 100644
index 0000000..66c331b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
@@ -0,0 +1,2755 @@
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: beam_runner_api.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf.internal import enum_type_wrapper
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf import descriptor_pb2
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='beam_runner_api.proto',
+  package='org.apache.beam.runner_api.v1',
+  syntax='proto3',
+  serialized_pb=_b('\n\x15\x62\x65\x61m_runner_api.proto\x12\x1dorg.apache.beam.runner_api.v1\x1a\x19google/protobuf/any.proto\"\x8d\x07\n\nComponents\x12M\n\ntransforms\x18\x01 \x03(\x0b\x32\x39.org.apache.beam.runner_api.v1.Components.TransformsEntry\x12Q\n\x0cpcollections\x18\x02 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.PcollectionsEntry\x12`\n\x14windowing_strategies\x18\x03 \x03(\x0b\x32\x42.org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry\x12\x45\n\x06\x63oders\x18\x04 \x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.Components.CodersEntry\x12Q\n\x0c\x65nvironments\x18\x05 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.EnvironmentsEntry\x1a\\\n\x0fTransformsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransform:\x02\x38\x01\x1a_\n\x11PcollectionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x39\n\x05value\x18\x02 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollection:\x02\
 x38\x01\x1al\n\x18WindowingStrategiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12?\n\x05value\x18\x02 \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategy:\x02\x38\x01\x1aS\n\x0b\x43odersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.org.apache.beam.runner_api.v1.Coder:\x02\x38\x01\x1a_\n\x11\x45nvironmentsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x39\n\x05value\x18\x02 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.Environment:\x02\x38\x01\"\xe4\x06\n\x15MessageWithComponents\x12=\n\ncomponents\x18\x01 \x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x35\n\x05\x63oder\x18\x02 \x01(\x0b\x32$.org.apache.beam.runner_api.v1.CoderH\x00\x12H\n\x0f\x63ombine_payload\x18\x03 \x01(\x0b\x32-.org.apache.beam.runner_api.v1.CombinePayloadH\x00\x12\x44\n\rfunction_spec\x18\x04 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpecH\x00\x12\x45\n\x0epar_do_payload\x18\x06 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.ParDoPayloadH\x00\x12?\
 n\nptransform\x18\x07 \x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransformH\x00\x12\x41\n\x0bpcollection\x18\x08 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollectionH\x00\x12\x42\n\x0cread_payload\x18\t \x01(\x0b\x32*.org.apache.beam.runner_api.v1.ReadPayloadH\x00\x12>\n\nside_input\x18\x0b \x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInputH\x00\x12O\n\x13window_into_payload\x18\x0c \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowIntoPayloadH\x00\x12N\n\x12windowing_strategy\x18\r \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategyH\x00\x12M\n\x12urn_with_parameter\x18\x0e \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameterH\x00\x42\x06\n\x04root\"\xa6\x01\n\x08Pipeline\x12=\n\ncomponents\x18\x01 \x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x19\n\x11root_transform_id\x18\x02 \x01(\t\x12@\n\x0c\x64isplay_data\x18\x03 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\"\xa8\x03\n\nPTransform\x12\x13\n\x0bunique_name\x18\x05
  \x01(\t\x12=\n\x04spec\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12\x15\n\rsubtransforms\x18\x02 \x03(\t\x12\x45\n\x06inputs\x18\x03 \x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.PTransform.InputsEntry\x12G\n\x07outputs\x18\x04 \x03(\x0b\x32\x36.org.apache.beam.runner_api.v1.PTransform.OutputsEntry\x12@\n\x0c\x64isplay_data\x18\x06 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\x1a-\n\x0bInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a.\n\x0cOutputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xd3\x01\n\x0bPCollection\x12\x13\n\x0bunique_name\x18\x01 \x01(\t\x12\x10\n\x08\x63oder_id\x18\x02 \x01(\t\x12<\n\nis_bounded\x18\x03 \x01(\x0e\x32(.org.apache.beam.runner_api.v1.IsBounded\x12\x1d\n\x15windowing_strategy_id\x18\x04 \x01(\t\x12@\n\x0c\x64isplay_data\x18\x05 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\"\xb5\x03\n\x0cParDoPayload\x12:\n\x05
 \x64o_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12<\n\nparameters\x18\x02 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.Parameter\x12P\n\x0bside_inputs\x18\x03 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry\x12=\n\x0bstate_specs\x18\x04 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.StateSpec\x12=\n\x0btimer_specs\x18\x05 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.TimerSpec\x1a[\n\x0fSideInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInput:\x02\x38\x01\"\x8b\x01\n\tParameter\x12;\n\x04type\x18\x01 \x01(\x0e\x32-.org.apache.beam.runner_api.v1.Parameter.Type\"A\n\x04Type\x12\n\n\x06WINDOW\x10\x00\x12\x14\n\x10PIPELINE_OPTIONS\x10\x01\x12\x17\n\x13RESTRICTION_TRACKER\x10\x02\"\x0b\n\tStateSpec\"\x0b\n\tTimerSpec\"\x88\x01\n\x0bReadPayload\x12;\n\x06source\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12<\n\nis_bounded\x18\x02 \x01(\x0e\x32
 (.org.apache.beam.runner_api.v1.IsBounded\"S\n\x11WindowIntoPayload\x12>\n\twindow_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\"\xde\x02\n\x0e\x43ombinePayload\x12?\n\ncombine_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12\x1c\n\x14\x61\x63\x63umulator_coder_id\x18\x02 \x01(\t\x12<\n\nparameters\x18\x03 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.Parameter\x12R\n\x0bside_inputs\x18\x04 \x03(\x0b\x32=.org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry\x1a[\n\x0fSideInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInput:\x02\x38\x01\"_\n\x05\x43oder\x12\x39\n\x04spec\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12\x1b\n\x13\x63omponent_coder_ids\x18\x02 \x03(\t\"\xd7\x03\n\x11WindowingStrategy\x12>\n\twindow_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12@\n\x0cmerge_status\x18\x02 \x01(\x0e\x32*.org.apache
 .beam.runner_api.v1.MergeStatus\x12\x17\n\x0fwindow_coder_id\x18\x03 \x01(\t\x12\x37\n\x07trigger\x18\x04 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x12J\n\x11\x61\x63\x63umulation_mode\x18\x05 \x01(\x0e\x32/.org.apache.beam.runner_api.v1.AccumulationMode\x12>\n\x0boutput_time\x18\x06 \x01(\x0e\x32).org.apache.beam.runner_api.v1.OutputTime\x12H\n\x10\x63losing_behavior\x18\x07 \x01(\x0e\x32..org.apache.beam.runner_api.v1.ClosingBehavior\x12\x18\n\x10\x61llowed_lateness\x18\x08 \x01(\x03\"\xac\r\n\x07Trigger\x12\x44\n\tafter_all\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.Trigger.AfterAllH\x00\x12\x44\n\tafter_any\x18\x02 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.Trigger.AfterAnyH\x00\x12\x46\n\nafter_each\x18\x03 \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.Trigger.AfterEachH\x00\x12U\n\x12\x61\x66ter_end_of_widow\x18\x04 \x01(\x0b\x32\x37.org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindowH\x00\x12[\n\x15\x61\x66ter_processing_time\x18\x05 \x01(\x0b\x32:
 .org.apache.beam.runner_api.v1.Trigger.AfterProcessingTimeH\x00\x12t\n\"after_synchronized_processing_time\x18\x06 \x01(\x0b\x32\x46.org.apache.beam.runner_api.v1.Trigger.AfterSynchronizedProcessingTimeH\x00\x12?\n\x06\x61lways\x18\x0c \x01(\x0b\x32-.org.apache.beam.runner_api.v1.Trigger.AlwaysH\x00\x12\x41\n\x07\x64\x65\x66\x61ult\x18\x07 \x01(\x0b\x32..org.apache.beam.runner_api.v1.Trigger.DefaultH\x00\x12L\n\relement_count\x18\x08 \x01(\x0b\x32\x33.org.apache.beam.runner_api.v1.Trigger.ElementCountH\x00\x12=\n\x05never\x18\t \x01(\x0b\x32,.org.apache.beam.runner_api.v1.Trigger.NeverH\x00\x12\x46\n\nor_finally\x18\n \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.Trigger.OrFinallyH\x00\x12?\n\x06repeat\x18\x0b \x01(\x0b\x32-.org.apache.beam.runner_api.v1.Trigger.RepeatH\x00\x1aG\n\x08\x41\x66terAll\x12;\n\x0bsubtriggers\x18\x01 \x03(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1aG\n\x08\x41\x66terAny\x12;\n\x0bsubtriggers\x18\x01 \x03(\x0b\x32&.org.apache.beam.runner_api.v1.Tr
 igger\x1aH\n\tAfterEach\x12;\n\x0bsubtriggers\x18\x01 \x03(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1a\x8f\x01\n\x10\x41\x66terEndOfWindow\x12=\n\rearly_firings\x18\x01 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x12<\n\x0clate_firings\x18\x02 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1a\x66\n\x13\x41\x66terProcessingTime\x12O\n\x14timestamp_transforms\x18\x01 \x03(\x0b\x32\x31.org.apache.beam.runner_api.v1.TimestampTransform\x1a!\n\x1f\x41\x66terSynchronizedProcessingTime\x1a\t\n\x07\x44\x65\x66\x61ult\x1a%\n\x0c\x45lementCount\x12\x15\n\relement_count\x18\x01 \x01(\x05\x1a\x07\n\x05Never\x1a\x08\n\x06\x41lways\x1az\n\tOrFinally\x12\x34\n\x04main\x18\x01 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x12\x37\n\x07\x66inally\x18\x02 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1a\x44\n\x06Repeat\x12:\n\nsubtrigger\x18\x01 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.TriggerB\t\n\x07trigger\"\x8e\x02\n\x12TimestampTransform\x12H\n\x05\x64\x
 65lay\x18\x01 \x01(\x0b\x32\x37.org.apache.beam.runner_api.v1.TimestampTransform.DelayH\x00\x12M\n\x08\x61lign_to\x18\x02 \x01(\x0b\x32\x39.org.apache.beam.runner_api.v1.TimestampTransform.AlignToH\x00\x1a\x1d\n\x05\x44\x65lay\x12\x14\n\x0c\x64\x65lay_millis\x18\x01 \x01(\x03\x1a)\n\x07\x41lignTo\x12\x0e\n\x06period\x18\x03 \x01(\x03\x12\x0e\n\x06offset\x18\x04 \x01(\x03\x42\x15\n\x13timestamp_transform\"\xda\x01\n\tSideInput\x12G\n\x0e\x61\x63\x63\x65ss_pattern\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12<\n\x07view_fn\x18\x02 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12\x46\n\x11window_mapping_fn\x18\x03 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\"\x1a\n\x0b\x45nvironment\x12\x0b\n\x03url\x18\x01 \x01(\t\"e\n\x0c\x46unctionSpec\x12=\n\x04spec\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12\x16\n\x0e\x65nvironment_id\x18\x02 \x01(\t\"H\n\x10UrnWithParameter\x12\x0b\n\x03urn\x18\x01 \x01(\t\x12\'\
 n\tparameter\x18\x02 \x01(\x0b\x32\x14.google.protobuf.Any\"\xf7\x03\n\x0b\x44isplayData\x12>\n\x05items\x18\x01 \x03(\x0b\x32/.org.apache.beam.runner_api.v1.DisplayData.Item\x1a\x46\n\nIdentifier\x12\x14\n\x0ctransform_id\x18\x01 \x01(\t\x12\x15\n\rtransform_urn\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\x1a\xf9\x01\n\x04Item\x12\x41\n\x02id\x18\x01 \x01(\x0b\x32\x35.org.apache.beam.runner_api.v1.DisplayData.Identifier\x12=\n\x04type\x18\x02 \x01(\x0e\x32/.org.apache.beam.runner_api.v1.DisplayData.Type\x12#\n\x05value\x18\x03 \x01(\x0b\x32\x14.google.protobuf.Any\x12)\n\x0bshort_value\x18\x04 \x01(\x0b\x32\x14.google.protobuf.Any\x12\r\n\x05label\x18\x05 \x01(\t\x12\x10\n\x08link_url\x18\x06 \x01(\t\"d\n\x04Type\x12\n\n\x06STRING\x10\x00\x12\x0b\n\x07INTEGER\x10\x01\x12\t\n\x05\x46LOAT\x10\x02\x12\x0b\n\x07\x42OOLEAN\x10\x03\x12\r\n\tTIMESTAMP\x10\x04\x12\x0c\n\x08\x44URATION\x10\x05\x12\x0e\n\nJAVA_CLASS\x10\x06*\'\n\tIsBounded\x12\x0b\n\x07\x42OUNDED\x10\x00\x12\r\n\tUNBOUN
 DED\x10\x01*C\n\x0bMergeStatus\x12\x0f\n\x0bNON_MERGING\x10\x00\x12\x0f\n\x0bNEEDS_MERGE\x10\x01\x12\x12\n\x0e\x41LREADY_MERGED\x10\x02*4\n\x10\x41\x63\x63umulationMode\x12\x0e\n\nDISCARDING\x10\x00\x12\x10\n\x0c\x41\x43\x43UMULATING\x10\x01*8\n\x0f\x43losingBehavior\x12\x0f\n\x0b\x45MIT_ALWAYS\x10\x00\x12\x14\n\x10\x45MIT_IF_NONEMPTY\x10\x01*I\n\nOutputTime\x12\x11\n\rEND_OF_WINDOW\x10\x00\x12\x12\n\x0eLATEST_IN_PANE\x10\x01\x12\x14\n\x10\x45\x41RLIEST_IN_PANE\x10\x02*S\n\nTimeDomain\x12\x0e\n\nEVENT_TIME\x10\x00\x12\x13\n\x0fPROCESSING_TIME\x10\x01\x12 \n\x1cSYNCHRONIZED_PROCESSING_TIME\x10\x02\x42\x31\n$org.apache.beam.sdk.common.runner.v1B\tRunnerApib\x06proto3')
+  ,
+  dependencies=[google_dot_protobuf_dot_any__pb2.DESCRIPTOR,])
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+_ISBOUNDED = _descriptor.EnumDescriptor(
+  name='IsBounded',
+  full_name='org.apache.beam.runner_api.v1.IsBounded',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='BOUNDED', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='UNBOUNDED', index=1, number=1,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=7348,
+  serialized_end=7387,
+)
+_sym_db.RegisterEnumDescriptor(_ISBOUNDED)
+
+IsBounded = enum_type_wrapper.EnumTypeWrapper(_ISBOUNDED)
+_MERGESTATUS = _descriptor.EnumDescriptor(
+  name='MergeStatus',
+  full_name='org.apache.beam.runner_api.v1.MergeStatus',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='NON_MERGING', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='NEEDS_MERGE', index=1, number=1,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='ALREADY_MERGED', index=2, number=2,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=7389,
+  serialized_end=7456,
+)
+_sym_db.RegisterEnumDescriptor(_MERGESTATUS)
+
+MergeStatus = enum_type_wrapper.EnumTypeWrapper(_MERGESTATUS)
+_ACCUMULATIONMODE = _descriptor.EnumDescriptor(
+  name='AccumulationMode',
+  full_name='org.apache.beam.runner_api.v1.AccumulationMode',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='DISCARDING', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='ACCUMULATING', index=1, number=1,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=7458,
+  serialized_end=7510,
+)
+_sym_db.RegisterEnumDescriptor(_ACCUMULATIONMODE)
+
+AccumulationMode = enum_type_wrapper.EnumTypeWrapper(_ACCUMULATIONMODE)
+_CLOSINGBEHAVIOR = _descriptor.EnumDescriptor(
+  name='ClosingBehavior',
+  full_name='org.apache.beam.runner_api.v1.ClosingBehavior',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='EMIT_ALWAYS', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='EMIT_IF_NONEMPTY', index=1, number=1,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=7512,
+  serialized_end=7568,
+)
+_sym_db.RegisterEnumDescriptor(_CLOSINGBEHAVIOR)
+
+ClosingBehavior = enum_type_wrapper.EnumTypeWrapper(_CLOSINGBEHAVIOR)
+_OUTPUTTIME = _descriptor.EnumDescriptor(
+  name='OutputTime',
+  full_name='org.apache.beam.runner_api.v1.OutputTime',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='END_OF_WINDOW', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='LATEST_IN_PANE', index=1, number=1,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='EARLIEST_IN_PANE', index=2, number=2,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=7570,
+  serialized_end=7643,
+)
+_sym_db.RegisterEnumDescriptor(_OUTPUTTIME)
+
+OutputTime = enum_type_wrapper.EnumTypeWrapper(_OUTPUTTIME)
+_TIMEDOMAIN = _descriptor.EnumDescriptor(
+  name='TimeDomain',
+  full_name='org.apache.beam.runner_api.v1.TimeDomain',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='EVENT_TIME', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='PROCESSING_TIME', index=1, number=1,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='SYNCHRONIZED_PROCESSING_TIME', index=2, number=2,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=7645,
+  serialized_end=7728,
+)
+_sym_db.RegisterEnumDescriptor(_TIMEDOMAIN)
+
+TimeDomain = enum_type_wrapper.EnumTypeWrapper(_TIMEDOMAIN)
+BOUNDED = 0
+UNBOUNDED = 1
+NON_MERGING = 0
+NEEDS_MERGE = 1
+ALREADY_MERGED = 2
+DISCARDING = 0
+ACCUMULATING = 1
+EMIT_ALWAYS = 0
+EMIT_IF_NONEMPTY = 1
+END_OF_WINDOW = 0
+LATEST_IN_PANE = 1
+EARLIEST_IN_PANE = 2
+EVENT_TIME = 0
+PROCESSING_TIME = 1
+SYNCHRONIZED_PROCESSING_TIME = 2
+
+
+_PARAMETER_TYPE = _descriptor.EnumDescriptor(
+  name='Type',
+  full_name='org.apache.beam.runner_api.v1.Parameter.Type',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='WINDOW', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='PIPELINE_OPTIONS', index=1, number=1,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='RESTRICTION_TRACKER', index=2, number=2,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=3191,
+  serialized_end=3256,
+)
+_sym_db.RegisterEnumDescriptor(_PARAMETER_TYPE)
+
+_DISPLAYDATA_TYPE = _descriptor.EnumDescriptor(
+  name='Type',
+  full_name='org.apache.beam.runner_api.v1.DisplayData.Type',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='STRING', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='INTEGER', index=1, number=1,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='FLOAT', index=2, number=2,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='BOOLEAN', index=3, number=3,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='TIMESTAMP', index=4, number=4,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='DURATION', index=5, number=5,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='JAVA_CLASS', index=6, number=6,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=7246,
+  serialized_end=7346,
+)
+_sym_db.RegisterEnumDescriptor(_DISPLAYDATA_TYPE)
+
+
+_COMPONENTS_TRANSFORMSENTRY = _descriptor.Descriptor(
+  name='TransformsEntry',
+  full_name='org.apache.beam.runner_api.v1.Components.TransformsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.Components.TransformsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.Components.TransformsEntry.value', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=512,
+  serialized_end=604,
+)
+
+_COMPONENTS_PCOLLECTIONSENTRY = _descriptor.Descriptor(
+  name='PcollectionsEntry',
+  full_name='org.apache.beam.runner_api.v1.Components.PcollectionsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.Components.PcollectionsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.Components.PcollectionsEntry.value', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=606,
+  serialized_end=701,
+)
+
+_COMPONENTS_WINDOWINGSTRATEGIESENTRY = _descriptor.Descriptor(
+  name='WindowingStrategiesEntry',
+  full_name='org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry.value', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=703,
+  serialized_end=811,
+)
+
+_COMPONENTS_CODERSENTRY = _descriptor.Descriptor(
+  name='CodersEntry',
+  full_name='org.apache.beam.runner_api.v1.Components.CodersEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.Components.CodersEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.Components.CodersEntry.value', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=813,
+  serialized_end=896,
+)
+
+_COMPONENTS_ENVIRONMENTSENTRY = _descriptor.Descriptor(
+  name='EnvironmentsEntry',
+  full_name='org.apache.beam.runner_api.v1.Components.EnvironmentsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.Components.EnvironmentsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.Components.EnvironmentsEntry.value', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=898,
+  serialized_end=993,
+)
+
+_COMPONENTS = _descriptor.Descriptor(
+  name='Components',
+  full_name='org.apache.beam.runner_api.v1.Components',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='transforms', full_name='org.apache.beam.runner_api.v1.Components.transforms', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='pcollections', full_name='org.apache.beam.runner_api.v1.Components.pcollections', index=1,
+      number=2, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='windowing_strategies', full_name='org.apache.beam.runner_api.v1.Components.windowing_strategies', index=2,
+      number=3, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='coders', full_name='org.apache.beam.runner_api.v1.Components.coders', index=3,
+      number=4, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='environments', full_name='org.apache.beam.runner_api.v1.Components.environments', index=4,
+      number=5, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_COMPONENTS_TRANSFORMSENTRY, _COMPONENTS_PCOLLECTIONSENTRY, _COMPONENTS_WINDOWINGSTRATEGIESENTRY, _COMPONENTS_CODERSENTRY, _COMPONENTS_ENVIRONMENTSENTRY, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=84,
+  serialized_end=993,
+)
+
+
+_MESSAGEWITHCOMPONENTS = _descriptor.Descriptor(
+  name='MessageWithComponents',
+  full_name='org.apache.beam.runner_api.v1.MessageWithComponents',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='components', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.components', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='coder', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.coder', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='combine_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.combine_payload', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='function_spec', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.function_spec', index=3,
+      number=4, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='par_do_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.par_do_payload', index=4,
+      number=6, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='ptransform', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.ptransform', index=5,
+      number=7, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='pcollection', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.pcollection', index=6,
+      number=8, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='read_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.read_payload', index=7,
+      number=9, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='side_input', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.side_input', index=8,
+      number=11, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='window_into_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.window_into_payload', index=9,
+      number=12, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='windowing_strategy', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.windowing_strategy', index=10,
+      number=13, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='urn_with_parameter', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.urn_with_parameter', index=11,
+      number=14, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+    _descriptor.OneofDescriptor(
+      name='root', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.root',
+      index=0, containing_type=None, fields=[]),
+  ],
+  serialized_start=996,
+  serialized_end=1864,
+)
+
+
+_PIPELINE = _descriptor.Descriptor(
+  name='Pipeline',
+  full_name='org.apache.beam.runner_api.v1.Pipeline',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='components', full_name='org.apache.beam.runner_api.v1.Pipeline.components', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='root_transform_id', full_name='org.apache.beam.runner_api.v1.Pipeline.root_transform_id', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='display_data', full_name='org.apache.beam.runner_api.v1.Pipeline.display_data', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1867,
+  serialized_end=2033,
+)
+
+
+_PTRANSFORM_INPUTSENTRY = _descriptor.Descriptor(
+  name='InputsEntry',
+  full_name='org.apache.beam.runner_api.v1.PTransform.InputsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.PTransform.InputsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.PTransform.InputsEntry.value', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=2367,
+  serialized_end=2412,
+)
+
+_PTRANSFORM_OUTPUTSENTRY = _descriptor.Descriptor(
+  name='OutputsEntry',
+  full_name='org.apache.beam.runner_api.v1.PTransform.OutputsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.PTransform.OutputsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.PTransform.OutputsEntry.value', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=2414,
+  serialized_end=2460,
+)
+
+_PTRANSFORM = _descriptor.Descriptor(
+  name='PTransform',
+  full_name='org.apache.beam.runner_api.v1.PTransform',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='unique_name', full_name='org.apache.beam.runner_api.v1.PTransform.unique_name', index=0,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='spec', full_name='org.apache.beam.runner_api.v1.PTransform.spec', index=1,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='subtransforms', full_name='org.apache.beam.runner_api.v1.PTransform.subtransforms', index=2,
+      number=2, type=9, cpp_type=9, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='inputs', full_name='org.apache.beam.runner_api.v1.PTransform.inputs', index=3,
+      number=3, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='outputs', full_name='org.apache.beam.runner_api.v1.PTransform.outputs', index=4,
+      number=4, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='display_data', full_name='org.apache.beam.runner_api.v1.PTransform.display_data', index=5,
+      number=6, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_PTRANSFORM_INPUTSENTRY, _PTRANSFORM_OUTPUTSENTRY, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=2036,
+  serialized_end=2460,
+)
+
+
+_PCOLLECTION = _descriptor.Descriptor(
+  name='PCollection',
+  full_name='org.apache.beam.runner_api.v1.PCollection',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='unique_name', full_name='org.apache.beam.runner_api.v1.PCollection.unique_name', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='coder_id', full_name='org.apache.beam.runner_api.v1.PCollection.coder_id', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='is_bounded', full_name='org.apache.beam.runner_api.v1.PCollection.is_bounded', index=2,
+      number=3, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='windowing_strategy_id', full_name='org.apache.beam.runner_api.v1.PCollection.windowing_strategy_id', index=3,
+      number=4, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='display_data', full_name='org.apache.beam.runner_api.v1.PCollection.display_data', index=4,
+      number=5, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=2463,
+  serialized_end=2674,
+)
+
+
+_PARDOPAYLOAD_SIDEINPUTSENTRY = _descriptor.Descriptor(
+  name='SideInputsEntry',
+  full_name='org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry.value', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3023,
+  serialized_end=3114,
+)
+
+_PARDOPAYLOAD = _descriptor.Descriptor(
+  name='ParDoPayload',
+  full_name='org.apache.beam.runner_api.v1.ParDoPayload',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='do_fn', full_name='org.apache.beam.runner_api.v1.ParDoPayload.do_fn', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='parameters', full_name='org.apache.beam.runner_api.v1.ParDoPayload.parameters', index=1,
+      number=2, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='side_inputs', full_name='org.apache.beam.runner_api.v1.ParDoPayload.side_inputs', index=2,
+      number=3, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='state_specs', full_name='org.apache.beam.runner_api.v1.ParDoPayload.state_specs', index=3,
+      number=4, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='timer_specs', full_name='org.apache.beam.runner_api.v1.ParDoPayload.timer_specs', index=4,
+      number=5, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_PARDOPAYLOAD_SIDEINPUTSENTRY, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=2677,
+  serialized_end=3114,
+)
+
+
+_PARAMETER = _descriptor.Descriptor(
+  name='Parameter',
+  full_name='org.apache.beam.runner_api.v1.Parameter',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='type', full_name='org.apache.beam.runner_api.v1.Parameter.type', index=0,
+      number=1, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+    _PARAMETER_TYPE,
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3117,
+  serialized_end=3256,
+)
+
+
+_STATESPEC = _descriptor.Descriptor(
+  name='StateSpec',
+  full_name='org.apache.beam.runner_api.v1.StateSpec',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3258,
+  serialized_end=3269,
+)
+
+
+_TIMERSPEC = _descriptor.Descriptor(
+  name='TimerSpec',
+  full_name='org.apache.beam.runner_api.v1.TimerSpec',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3271,
+  serialized_end=3282,
+)
+
+
+_READPAYLOAD = _descriptor.Descriptor(
+  name='ReadPayload',
+  full_name='org.apache.beam.runner_api.v1.ReadPayload',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='source', full_name='org.apache.beam.runner_api.v1.ReadPayload.source', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='is_bounded', full_name='org.apache.beam.runner_api.v1.ReadPayload.is_bounded', index=1,
+      number=2, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3285,
+  serialized_end=3421,
+)
+
+
+_WINDOWINTOPAYLOAD = _descriptor.Descriptor(
+  name='WindowIntoPayload',
+  full_name='org.apache.beam.runner_api.v1.WindowIntoPayload',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='window_fn', full_name='org.apache.beam.runner_api.v1.WindowIntoPayload.window_fn', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3423,
+  serialized_end=3506,
+)
+
+
+_COMBINEPAYLOAD_SIDEINPUTSENTRY = _descriptor.Descriptor(
+  name='SideInputsEntry',
+  full_name='org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry.value', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3023,
+  serialized_end=3114,
+)
+
+_COMBINEPAYLOAD = _descriptor.Descriptor(
+  name='CombinePayload',
+  full_name='org.apache.beam.runner_api.v1.CombinePayload',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='combine_fn', full_name='org.apache.beam.runner_api.v1.CombinePayload.combine_fn', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='accumulator_coder_id', full_name='org.apache.beam.runner_api.v1.CombinePayload.accumulator_coder_id', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='parameters', full_name='org.apache.beam.runner_api.v1.CombinePayload.parameters', index=2,
+      number=3, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='side_inputs', full_name='org.apache.beam.runner_api.v1.CombinePayload.side_inputs', index=3,
+      number=4, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_COMBINEPAYLOAD_SIDEINPUTSENTRY, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3509,
+  serialized_end=3859,
+)
+
+
+_CODER = _descriptor.Descriptor(
+  name='Coder',
+  full_name='org.apache.beam.runner_api.v1.Coder',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='spec', full_name='org.apache.beam.runner_api.v1.Coder.spec', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='component_coder_ids', full_name='org.apache.beam.runner_api.v1.Coder.component_coder_ids', index=1,
+      number=2, type=9, cpp_type=9, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3861,
+  serialized_end=3956,
+)
+
+
+_WINDOWINGSTRATEGY = _descriptor.Descriptor(
+  name='WindowingStrategy',
+  full_name='org.apache.beam.runner_api.v1.WindowingStrategy',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='window_fn', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.window_fn', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='merge_status', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.merge_status', index=1,
+      number=2, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='window_coder_id', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.window_coder_id', index=2,
+      number=3, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='trigger', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.trigger', index=3,
+      number=4, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='accumulation_mode', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.accumulation_mode', index=4,
+      number=5, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='output_time', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.output_time', index=5,
+      number=6, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='closing_behavior', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.closing_behavior', index=6,
+      number=7, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='allowed_lateness', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.allowed_lateness', index=7,
+      number=8, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=3959,
+  serialized_end=4430,
+)
+
+
+_TRIGGER_AFTERALL = _descriptor.Descriptor(
+  name='AfterAll',
+  full_name='org.apache.beam.runner_api.v1.Trigger.AfterAll',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='subtriggers', full_name='org.apache.beam.runner_api.v1.Trigger.AfterAll.subtriggers', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5364,
+  serialized_end=5435,
+)
+
+_TRIGGER_AFTERANY = _descriptor.Descriptor(
+  name='AfterAny',
+  full_name='org.apache.beam.runner_api.v1.Trigger.AfterAny',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='subtriggers', full_name='org.apache.beam.runner_api.v1.Trigger.AfterAny.subtriggers', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5437,
+  serialized_end=5508,
+)
+
+_TRIGGER_AFTEREACH = _descriptor.Descriptor(
+  name='AfterEach',
+  full_name='org.apache.beam.runner_api.v1.Trigger.AfterEach',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='subtriggers', full_name='org.apache.beam.runner_api.v1.Trigger.AfterEach.subtriggers', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5510,
+  serialized_end=5582,
+)
+
+_TRIGGER_AFTERENDOFWINDOW = _descriptor.Descriptor(
+  name='AfterEndOfWindow',
+  full_name='org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindow',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='early_firings', full_name='org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindow.early_firings', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='late_firings', full_name='org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindow.late_firings', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5585,
+  serialized_end=5728,
+)
+
+_TRIGGER_AFTERPROCESSINGTIME = _descriptor.Descriptor(
+  name='AfterProcessingTime',
+  full_name='org.apache.beam.runner_api.v1.Trigger.AfterProcessingTime',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='timestamp_transforms', full_name='org.apache.beam.runner_api.v1.Trigger.AfterProcessingTime.timestamp_transforms', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5730,
+  serialized_end=5832,
+)
+
+_TRIGGER_AFTERSYNCHRONIZEDPROCESSINGTIME = _descriptor.Descriptor(
+  name='AfterSynchronizedProcessingTime',
+  full_name='org.apache.beam.runner_api.v1.Trigger.AfterSynchronizedProcessingTime',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5834,
+  serialized_end=5867,
+)
+
+_TRIGGER_DEFAULT = _descriptor.Descriptor(
+  name='Default',
+  full_name='org.apache.beam.runner_api.v1.Trigger.Default',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5869,
+  serialized_end=5878,
+)
+
+_TRIGGER_ELEMENTCOUNT = _descriptor.Descriptor(
+  name='ElementCount',
+  full_name='org.apache.beam.runner_api.v1.Trigger.ElementCount',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='element_count', full_name='org.apache.beam.runner_api.v1.Trigger.ElementCount.element_count', index=0,
+      number=1, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5880,
+  serialized_end=5917,
+)
+
+_TRIGGER_NEVER = _descriptor.Descriptor(
+  name='Never',
+  full_name='org.apache.beam.runner_api.v1.Trigger.Never',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5919,
+  serialized_end=5926,
+)
+
+_TRIGGER_ALWAYS = _descriptor.Descriptor(
+  name='Always',
+  full_name='org.apache.beam.runner_api.v1.Trigger.Always',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5928,
+  serialized_end=5936,
+)
+
+_TRIGGER_ORFINALLY = _descriptor.Descriptor(
+  name='OrFinally',
+  full_name='org.apache.beam.runner_api.v1.Trigger.OrFinally',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='main', full_name='org.apache.beam.runner_api.v1.Trigger.OrFinally.main', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='finally', full_name='org.apache.beam.runner_api.v1.Trigger.OrFinally.finally', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=5938,
+  serialized_end=6060,
+)
+
+_TRIGGER_REPEAT = _descriptor.Descriptor(
+  name='Repeat',
+  full_name='org.apache.beam.runner_api.v1.Trigger.Repeat',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='subtrigger', full_name='org.apache.beam.runner_api.v1.Trigger.Repeat.subtrigger', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6062,
+  serialized_end=6130,
+)
+
+_TRIGGER = _descriptor.Descriptor(
+  name='Trigger',
+  full_name='org.apache.beam.runner_api.v1.Trigger',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='after_all', full_name='org.apache.beam.runner_api.v1.Trigger.after_all', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='after_any', full_name='org.apache.beam.runner_api.v1.Trigger.after_any', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='after_each', full_name='org.apache.beam.runner_api.v1.Trigger.after_each', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='after_end_of_widow', full_name='org.apache.beam.runner_api.v1.Trigger.after_end_of_widow', index=3,
+      number=4, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='after_processing_time', full_name='org.apache.beam.runner_api.v1.Trigger.after_processing_time', index=4,
+      number=5, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='after_synchronized_processing_time', full_name='org.apache.beam.runner_api.v1.Trigger.after_synchronized_processing_time', index=5,
+      number=6, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='always', full_name='org.apache.beam.runner_api.v1.Trigger.always', index=6,
+      number=12, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='default', full_name='org.apache.beam.runner_api.v1.Trigger.default', index=7,
+      number=7, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='element_count', full_name='org.apache.beam.runner_api.v1.Trigger.element_count', index=8,
+      number=8, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='never', full_name='org.apache.beam.runner_api.v1.Trigger.never', index=9,
+      number=9, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='or_finally', full_name='org.apache.beam.runner_api.v1.Trigger.or_finally', index=10,
+      number=10, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='repeat', full_name='org.apache.beam.runner_api.v1.Trigger.repeat', index=11,
+      number=11, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_TRIGGER_AFTERALL, _TRIGGER_AFTERANY, _TRIGGER_AFTEREACH, _TRIGGER_AFTERENDOFWINDOW, _TRIGGER_AFTERPROCESSINGTIME, _TRIGGER_AFTERSYNCHRONIZEDPROCESSINGTIME, _TRIGGER_DEFAULT, _TRIGGER_ELEMENTCOUNT, _TRIGGER_NEVER, _TRIGGER_ALWAYS, _TRIGGER_ORFINALLY, _TRIGGER_REPEAT, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+    _descriptor.OneofDescriptor(
+      name='trigger', full_name='org.apache.beam.runner_api.v1.Trigger.trigger',
+      index=0, containing_type=None, fields=[]),
+  ],
+  serialized_start=4433,
+  serialized_end=6141,
+)
+
+
+_TIMESTAMPTRANSFORM_DELAY = _descriptor.Descriptor(
+  name='Delay',
+  full_name='org.apache.beam.runner_api.v1.TimestampTransform.Delay',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='delay_millis', full_name='org.apache.beam.runner_api.v1.TimestampTransform.Delay.delay_millis', index=0,
+      number=1, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6319,
+  serialized_end=6348,
+)
+
+_TIMESTAMPTRANSFORM_ALIGNTO = _descriptor.Descriptor(
+  name='AlignTo',
+  full_name='org.apache.beam.runner_api.v1.TimestampTransform.AlignTo',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='period', full_name='org.apache.beam.runner_api.v1.TimestampTransform.AlignTo.period', index=0,
+      number=3, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='offset', full_name='org.apache.beam.runner_api.v1.TimestampTransform.AlignTo.offset', index=1,
+      number=4, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6350,
+  serialized_end=6391,
+)
+
+_TIMESTAMPTRANSFORM = _descriptor.Descriptor(
+  name='TimestampTransform',
+  full_name='org.apache.beam.runner_api.v1.TimestampTransform',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='delay', full_name='org.apache.beam.runner_api.v1.TimestampTransform.delay', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='align_to', full_name='org.apache.beam.runner_api.v1.TimestampTransform.align_to', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_TIMESTAMPTRANSFORM_DELAY, _TIMESTAMPTRANSFORM_ALIGNTO, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+    _descriptor.OneofDescriptor(
+      name='timestamp_transform', full_name='org.apache.beam.runner_api.v1.TimestampTransform.timestamp_transform',
+      index=0, containing_type=None, fields=[]),
+  ],
+  serialized_start=6144,
+  serialized_end=6414,
+)
+
+
+_SIDEINPUT = _descriptor.Descriptor(
+  name='SideInput',
+  full_name='org.apache.beam.runner_api.v1.SideInput',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='access_pattern', full_name='org.apache.beam.runner_api.v1.SideInput.access_pattern', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='view_fn', full_name='org.apache.beam.runner_api.v1.SideInput.view_fn', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='window_mapping_fn', full_name='org.apache.beam.runner_api.v1.SideInput.window_mapping_fn', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6417,
+  serialized_end=6635,
+)
+
+
+_ENVIRONMENT = _descriptor.Descriptor(
+  name='Environment',
+  full_name='org.apache.beam.runner_api.v1.Environment',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='url', full_name='org.apache.beam.runner_api.v1.Environment.url', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6637,
+  serialized_end=6663,
+)
+
+
+_FUNCTIONSPEC = _descriptor.Descriptor(
+  name='FunctionSpec',
+  full_name='org.apache.beam.runner_api.v1.FunctionSpec',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='spec', full_name='org.apache.beam.runner_api.v1.FunctionSpec.spec', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='environment_id', full_name='org.apache.beam.runner_api.v1.FunctionSpec.environment_id', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6665,
+  serialized_end=6766,
+)
+
+
+_URNWITHPARAMETER = _descriptor.Descriptor(
+  name='UrnWithParameter',
+  full_name='org.apache.beam.runner_api.v1.UrnWithParameter',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='urn', full_name='org.apache.beam.runner_api.v1.UrnWithParameter.urn', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='parameter', full_name='org.apache.beam.runner_api.v1.UrnWithParameter.parameter', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6768,
+  serialized_end=6840,
+)
+
+
+_DISPLAYDATA_IDENTIFIER = _descriptor.Descriptor(
+  name='Identifier',
+  full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='transform_id', full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier.transform_id', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='transform_urn', full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier.transform_urn', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='key', full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier.key', index=2,
+      number=3, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6922,
+  serialized_end=6992,
+)
+
+_DISPLAYDATA_ITEM = _descriptor.Descriptor(
+  name='Item',
+  full_name='org.apache.beam.runner_api.v1.DisplayData.Item',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='id', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.id', index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='type', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.type', index=1,
+      number=2, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.value', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='short_value', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.short_value', index=3,
+      number=4, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='label', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.label', index=4,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='link_url', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.link_url', index=5,
+      number=6, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6995,
+  serialized_end=7244,
+)
+
+_DISPLAYDATA = _descriptor.Descriptor(
+  name='DisplayData',
+  full_name='org.apache.beam.runner_api.v1.DisplayData',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='items', full_name='org.apache.beam.runner_api.v1.DisplayData.items', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_DISPLAYDATA_IDENTIFIER, _DISPLAYDATA_ITEM, ],
+  enum_types=[
+    _DISPLAYDATA_TYPE,
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=6843,
+  serialized_end=7346,
+)
+
+_COMPONENTS_TRANSFORMSENTRY.fields_by_name['value'].message_type = _PTRANSFORM
+_COMPONENTS_TRANSFORMSENTRY.containing_type = _COMPONENTS
+_COMPONENTS_PCOLLECTIONSENTRY.fields_by_name['value'].message_type = _PCOLLECTION
+_COMPONENTS_PCOLLECTIONSENTRY.containing_type = _COMPONENTS
+_COMPONENTS_WINDOWINGSTRATEGIESENTRY.fields_by_name['value'].message_type = _WINDOWINGSTRATEGY
+_COMPONENTS_WINDOWINGSTRATEGIESENTRY.containing_type = _COMPONENTS
+_COMPONENTS_CODERSENTRY.fields_by_name['value'].message_type = _CODER
+_COMPONENTS_CODERSENTRY.containing_type = _COMPONENTS
+_COMPONENTS_ENVIRONMENTSENTRY.fields_by_name['value'].message_type = _ENVIRONMENT
+_COMPONENTS_ENVIRONMENTSENTRY.containing_type = _COMPONENTS
+_COMPONENTS.fields_by_name['transforms'].message_type = _COMPONENTS_TRANSFORMSENTRY
+_COMPONENTS.fields_by_name['pcollections'].message_type = _COMPONENTS_PCOLLECTIONSENTRY
+_COMPONENTS.fields_by_name['windowing_strategies'].message_type = _COMPONENTS_WINDOWINGSTRATEGIESENTRY
+_COMPONENTS.fields_by_name['coders'].message_type = _COMPONENTS_CODERSENTRY
+_COMPONENTS.fields_by_name['environments'].message_type = _COMPONENTS_ENVIRONMENTSENTRY
+_MESSAGEWITHCOMPONENTS.fields_by_name['components'].message_type = _COMPONENTS
+_MESSAGEWITHCOMPONENTS.fields_by_name['coder'].message_type = _CODER
+_MESSAGEWITHCOMPONENTS.fields_by_name['combine_payload'].message_type = _COMBINEPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['function_spec'].message_type = _FUNCTIONSPEC
+_MESSAGEWITHCOMPONENTS.fields_by_name['par_do_payload'].message_type = _PARDOPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['ptransform'].message_type = _PTRANSFORM
+_MESSAGEWITHCOMPONENTS.fields_by_name['pcollection'].message_type = _PCOLLECTION
+_MESSAGEWITHCOMPONENTS.fields_by_name['read_payload'].message_type = _READPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['side_input'].message_type = _SIDEINPUT
+_MESSAGEWITHCOMPONENTS.fields_by_name['window_into_payload'].message_type = _WINDOWINTOPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['windowing_strategy'].message_type = _WINDOWINGSTRATEGY
+_MESSAGEWITHCOMPONENTS.fields_by_name['urn_with_parameter'].message_type = _URNWITHPARAMETER
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+  _MESSAGEWITHCOMPONENTS.fields_by_name['coder'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['coder'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+  _MESSAGEWITHCOMPONENTS.fields_by_name['combine_payload'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['combine_payload'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+  _MESSAGEWITHCOMPONENTS.fields_by_name['function_spec'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['function_spec'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+  _MESSAGEWITHCOMPONENTS.fields_by_name['par_do_payload'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['par_do_payload'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+  _MESSAGEWITHCOMPONENTS.fields_by_name['ptransform'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['ptransform'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+  _MESSAGEWITHCOMPONENTS.fields_by_name['pcollection'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['pcollection'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+  _MESSAGEWITHCOMPONENTS.fields_by_name['read_payload'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['read_payload'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].f

<TRUNCATED>

[2/9] beam git commit: Auto-generated runner api proto bindings.

Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index afc5fb4..5e63856 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -34,7 +34,8 @@ EXCLUDED_GENERATED_FILES=(
 "apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py"
 "apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py"
 "apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py"
-"apache_beam/coders/proto2_coder_test_messages_pb2.py")
+"apache_beam/coders/proto2_coder_test_messages_pb2.py"
+"apache_beam/runners/api/beam_runner_api_pb2.py")
 
 FILES_TO_IGNORE=""
 for file in "${EXCLUDED_GENERATED_FILES[@]}"; do


[8/9] beam git commit: Move pipeline context and add more tests.

Posted by ro...@apache.org.
Move pipeline context and add more tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/deff128f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/deff128f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/deff128f

Branch: refs/heads/master
Commit: deff128ff07ccc67956e1d5a94a64f2a31b224c8
Parents: b2da21e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 9 09:21:33 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:02 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        | 93 ++++++++++++++++++++
 sdks/python/apache_beam/pipeline.py             | 62 -------------
 .../apache_beam/runners/pipeline_context.py     | 88 ++++++++++++++++++
 .../runners/pipeline_context_test.py            | 49 +++++++++++
 sdks/python/apache_beam/transforms/core.py      |  1 +
 .../apache_beam/transforms/trigger_test.py      | 18 +---
 sdks/python/apache_beam/transforms/window.py    |  2 +-
 .../apache_beam/transforms/window_test.py       |  6 +-
 sdks/python/apache_beam/utils/urns.py           |  2 +-
 9 files changed, 238 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index fd72af8..9f5a97a 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -266,6 +266,12 @@ class BytesCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class VarIntCoder(FastCoder):
   """Variable-length integer coder."""
@@ -276,6 +282,12 @@ class VarIntCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class FloatCoder(FastCoder):
   """A coder used for floating-point values."""
@@ -286,6 +298,12 @@ class FloatCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class TimestampCoder(FastCoder):
   """A coder used for timeutil.Timestamp values."""
@@ -296,6 +314,12 @@ class TimestampCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class SingletonCoder(FastCoder):
   """A coder that always encodes exactly one value."""
@@ -309,6 +333,12 @@ class SingletonCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self._value == other._value
+
+  def __hash__(self):
+    return hash(self._value)
+
 
 def maybe_dill_dumps(o):
   """Pickle using cPickle or the Dill pickler as a fallback."""
@@ -365,6 +395,12 @@ class _PickleCoderBase(FastCoder):
   def value_coder(self):
     return self
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class PickleCoder(_PickleCoderBase):
   """Coder using Python's pickle functionality."""
@@ -446,6 +482,12 @@ class FastPrimitivesCoder(FastCoder):
   def value_coder(self):
     return self
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class Base64PickleCoder(Coder):
   """Coder of objects by Python pickle, then base64 encoding."""
@@ -503,6 +545,13 @@ class ProtoCoder(FastCoder):
     # a Map.
     return False
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self.proto_message_type == other.proto_message_type)
+
+  def __hash__(self):
+    return hash(self.proto_message_type)
+
   @staticmethod
   def from_type_hint(typehint, unused_registry):
     if issubclass(typehint, google.protobuf.message.Message):
@@ -563,6 +612,13 @@ class TupleCoder(FastCoder):
   def __repr__(self):
     return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._coders == self._coders)
+
+  def __hash__(self):
+    return hash(self._coders)
+
 
 class TupleSequenceCoder(FastCoder):
   """Coder of homogeneous tuple objects."""
@@ -586,6 +642,13 @@ class TupleSequenceCoder(FastCoder):
   def __repr__(self):
     return 'TupleSequenceCoder[%r]' % self._elem_coder
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._elem_coder == self._elem_coder)
+
+  def __hash__(self):
+    return hash((type(self), self._elem_coder))
+
 
 class IterableCoder(FastCoder):
   """Coder of iterables of homogeneous objects."""
@@ -619,6 +682,13 @@ class IterableCoder(FastCoder):
   def __repr__(self):
     return 'IterableCoder[%r]' % self._elem_coder
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._elem_coder == self._elem_coder)
+
+  def __hash__(self):
+    return hash((type(self), self._elem_coder))
+
 
 class WindowCoder(PickleCoder):
   """Coder for windows in windowed values."""
@@ -663,6 +733,12 @@ class IntervalWindowCoder(FastCoder):
         '@type': 'kind:interval_window',
     }
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class WindowedValueCoder(FastCoder):
   """Coder for windowed values."""
@@ -709,6 +785,16 @@ class WindowedValueCoder(FastCoder):
   def __repr__(self):
     return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self.wrapped_value_coder == other.wrapped_value_coder
+            and self.timestamp_coder == other.timestamp_coder
+            and self.window_coder == other.window_coder)
+
+  def __hash__(self):
+    return hash(
+        (self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
+
 
 class LengthPrefixCoder(FastCoder):
   """Coder which prefixes the length of the encoded object in the stream."""
@@ -740,3 +826,10 @@ class LengthPrefixCoder(FastCoder):
 
   def __repr__(self):
     return 'LengthPrefixCoder[%r]' % self._value_coder
+
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._value_coder == other._value_coder)
+
+  def __hash__(self):
+    return hash((type(self), self._value_coder))

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 9edcf9b..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,14 +52,11 @@ import os
 import shutil
 import tempfile
 
-from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
-from apache_beam.runners.api import beam_runner_api_pb2
-from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
 from apache_beam.utils.pipeline_options import PipelineOptions
@@ -443,62 +440,3 @@ class AppliedPTransform(object):
         if v not in visited:
           visited.add(v)
           visitor.visit_value(v, self)
-
-
-class PipelineContextMap(object):
-  """This is a bi-directional map between objects and ids.
-
-  Under the hood it encodes and decodes these objects into runner API
-  representations.
-  """
-  def __init__(self, context, obj_type, proto_map=None):
-    self._pipeline_context = context
-    self._obj_type = obj_type
-    self._obj_to_id = {}
-    self._id_to_obj = {}
-    self._id_to_proto = proto_map if proto_map else {}
-    self._counter = 0
-
-  def _unique_ref(self):
-    self._counter += 1
-    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
-
-  def populate_map(self, proto_map):
-    for id, obj in self._id_to_obj:
-      proto_map[id] = self._id_to_proto[id]
-
-  def get_id(self, obj):
-    if obj not in self._obj_to_id:
-      id = self._unique_ref()
-      self._id_to_obj[id] = obj
-      self._obj_to_id[obj] = id
-      self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
-    return self._obj_to_id[obj]
-
-  def get_by_id(self, id):
-    if id not in self._id_to_obj:
-      self._id_to_obj[id] = self._obj_type.from_runner_api(
-        self._id_to_proto[id], self._pipeline_context)
-    return self._id_to_obj[id]
-
-
-class PipelineContext(object):
-
-  _COMPONENT_TYPES = {
-    'transforms': AppliedPTransform,
-    'pcollections': pvalue.PCollection,
-    'coders': coders.Coder,
-    'windowing_strategies': core.Windowing,
-    # TODO: environment
-  }
-
-  def __init__(self, context_proto=None):
-    for name, cls in self._COMPONENT_TYPES.items():
-      setattr(self, name,
-              PipelineContextMap(self, cls, getattr(context_proto, name, None)))
-
-  def to_runner_api(self):
-    context_proto = beam_runner_api_pb2.Components()
-    for name, cls in self._COMPONENT_TYPES:
-      getattr(self, name).populate_map(getattr(context_proto, name))
-    return context_proto

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
new file mode 100644
index 0000000..4f82774
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apache_beam import pipeline
+from apache_beam import pvalue
+from apache_beam import coders
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
+
+
+class _PipelineContextMap(object):
+  """This is a bi-directional map between objects and ids.
+
+  Under the hood it encodes and decodes these objects into runner API
+  representations.
+  """
+  def __init__(self, context, obj_type, proto_map=None):
+    self._pipeline_context = context
+    self._obj_type = obj_type
+    self._obj_to_id = {}
+    self._id_to_obj = {}
+    self._id_to_proto = proto_map if proto_map else {}
+    self._counter = 0
+
+  def _unique_ref(self):
+    self._counter += 1
+    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+  def populate_map(self, proto_map):
+    for id, proto in self._id_to_proto.items():
+      proto_map[id].CopyFrom(proto)
+
+  def get_id(self, obj):
+    if obj not in self._obj_to_id:
+      id = self._unique_ref()
+      self._id_to_obj[id] = obj
+      self._obj_to_id[obj] = id
+      self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+    return self._obj_to_id[obj]
+
+  def get_by_id(self, id):
+    if id not in self._id_to_obj:
+      self._id_to_obj[id] = self._obj_type.from_runner_api(
+          self._id_to_proto[id], self._pipeline_context)
+    return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+  """Used for accessing and constructing the referenced objects of a Pipeline.
+  """
+
+  _COMPONENT_TYPES = {
+      'transforms': pipeline.AppliedPTransform,
+      'pcollections': pvalue.PCollection,
+      'coders': coders.Coder,
+      'windowing_strategies': core.Windowing,
+      # TODO: environment
+  }
+
+  def __init__(self, context_proto=None):
+    for name, cls in self._COMPONENT_TYPES.items():
+      setattr(
+          self, name, _PipelineContextMap(
+              self, cls, getattr(context_proto, name, None)))
+
+  @staticmethod
+  def from_runner_api(proto):
+    return PipelineContext(proto)
+
+  def to_runner_api(self):
+    context_proto = beam_runner_api_pb2.Components()
+    for name in self._COMPONENT_TYPES:
+      getattr(self, name).populate_map(getattr(context_proto, name))
+    return context_proto

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py
new file mode 100644
index 0000000..6091ed8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the windowing classes."""
+
+import unittest
+
+from apache_beam import coders
+from apache_beam.runners import pipeline_context
+
+
+class PipelineContextTest(unittest.TestCase):
+
+  def test_deduplication(self):
+    context = pipeline_context.PipelineContext()
+    bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+    bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder())
+    self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
+
+  def test_serialization(self):
+    context = pipeline_context.PipelineContext()
+    float_coder_ref = context.coders.get_id(coders.FloatCoder())
+    bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+    proto = context.to_runner_api()
+    context2 = pipeline_context.PipelineContext.from_runner_api(proto)
+    self.assertEqual(
+        coders.FloatCoder(),
+        context2.coders.get_by_id(float_coder_ref))
+    self.assertEqual(
+        coders.BytesCoder(),
+        context2.coders.get_by_id(bytes_coder_ref))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 1fc63b2..3251671 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1235,6 +1235,7 @@ class Windowing(object):
         trigger=self.triggerfn.to_runner_api(context),
         accumulation_mode=self.accumulation_mode,
         output_time=self.output_time_fn,
+        # TODO(robertwb): Support EMIT_IF_NONEMPTY
         closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
         allowed_lateness=0)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index cc9e0f5..827aa33 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -25,6 +25,7 @@ import unittest
 import yaml
 
 import apache_beam as beam
+from apache_beam.runners import pipeline_context
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
@@ -392,22 +393,7 @@ class RunnerApiTest(unittest.TestCase):
         AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
         Repeatedly(AfterCount(100)),
         trigger.OrFinally(AfterCount(3), AfterCount(10))):
-      context = beam.pipeline.PipelineContext()
-      self.assertEqual(
-          trigger_fn,
-          TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
-
-  def test_windowing_strategy_encoding(self):
-    for trigger_fn in (
-        DefaultTrigger(),
-        AfterAll(AfterCount(1), AfterCount(10)),
-        AfterFirst(AfterCount(10), AfterCount(100)),
-        AfterEach(AfterCount(100), AfterCount(1000)),
-        AfterWatermark(early=AfterCount(1000)),
-        AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
-        Repeatedly(AfterCount(100)),
-        trigger.OrFinally(AfterCount(3), AfterCount(10))):
-      context = beam.pipeline.PipelineContext()
+      context = pipeline_context.PipelineContext()
       self.assertEqual(
           trigger_fn,
           TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index c763a96..3878dff 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -73,6 +73,7 @@ class OutputTimeFn(object):
   OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
   OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
   OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
+  # TODO(robertwb): Add this to the runner API or remove it.
   OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
 
   @staticmethod
@@ -167,7 +168,6 @@ class WindowFn(object):
     return pickler.loads(fn_parameter.value)
 
   def to_runner_api_parameter(self, context):
-    raise TypeError(self)
     return (urns.PICKLED_WINDOW_FN,
             wrappers_pb2.BytesValue(value=pickler.dumps(self)))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index c79739a..99be02c 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,7 +19,7 @@
 
 import unittest
 
-from apache_beam import pipeline
+from apache_beam.runners import pipeline_context
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
@@ -238,7 +238,7 @@ class RunnerApiTest(unittest.TestCase):
                       FixedWindows(37),
                       SlidingWindows(2, 389),
                       Sessions(5077)):
-      context = pipeline.PipelineContext()
+      context = pipeline_context.PipelineContext()
       self.assertEqual(
           window_fn,
           WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
@@ -251,7 +251,7 @@ class RunnerApiTest(unittest.TestCase):
         Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
                   output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
                   accumulation_mode=AccumulationMode.DISCARDING)):
-      context = pipeline.PipelineContext()
+      context = pipeline_context.PipelineContext()
       self.assertEqual(
           windowing,
           Windowing.from_runner_api(windowing.to_runner_api(context), context))

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 186c99c..936e2cb 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -21,4 +21,4 @@ FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
 SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
 SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
 
-PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"
+PICKLED_CODER = "beam:coder:pickled_python:v0.1"


[6/9] beam git commit: Runner API translation of triggers and windowing strategies.

Posted by ro...@apache.org.
Runner API translation of triggers and windowing strategies.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b86e1fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b86e1fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b86e1fc

Branch: refs/heads/master
Commit: 5b86e1fc22234a7a6dd00696326fa0fae8fe7a2d
Parents: aad32b7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 16:18:02 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        |  20 +++
 sdks/python/apache_beam/pipeline.py             |   2 +-
 sdks/python/apache_beam/transforms/core.py      |  38 +++++
 sdks/python/apache_beam/transforms/trigger.py   | 143 ++++++++++++++++++-
 .../apache_beam/transforms/trigger_test.py      |  33 +++++
 sdks/python/apache_beam/transforms/window.py    |  34 +++--
 .../apache_beam/transforms/window_test.py       |  23 ++-
 7 files changed, 272 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 1d29f32..fd72af8 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -22,6 +22,8 @@ import cPickle as pickle
 import google.protobuf
 
 from apache_beam.coders import coder_impl
+from apache_beam.utils import urns
+from apache_beam.utils import proto_utils
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
@@ -182,6 +184,24 @@ class Coder(object):
             and self._dict_without_impl() == other._dict_without_impl())
     # pylint: enable=protected-access
 
+  def to_runner_api(self, context):
+    # TODO(BEAM-115): Use specialized URNs and components.
+    from apache_beam.runners.api import beam_runner_api_pb2
+    return beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(
+            spec=beam_runner_api_pb2.UrnWithParameter(
+                urn=urns.PICKLED_CODER,
+                parameter=proto_utils.pack_Any(
+                    google.protobuf.wrappers_pb2.BytesValue(
+                        value=serialize_coder(self))))))
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    any_proto = proto.spec.spec.parameter
+    bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
+    any_proto.Unpack(bytes_proto)
+    return deserialize_coder(bytes_proto.value)
+
 
 class StrUtf8Coder(Coder):
   """A coder used for reading and writing strings as UTF-8."""

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 4ec2e47..9edcf9b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -499,6 +499,6 @@ class PipelineContext(object):
 
   def to_runner_api(self):
     context_proto = beam_runner_api_pb2.Components()
-    for name, cls in self._COMPONENT_TYEPS:
+    for name, cls in self._COMPONENT_TYPES:
       getattr(self, name).populate_map(getattr(context_proto, name))
     return context_proto

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 7abd784..1fc63b2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -27,6 +27,7 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.coders import typecoders
 from apache_beam.internal import util
+from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import ptransform
 from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
@@ -49,6 +50,7 @@ from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints.trivial_inference import element_type
 from apache_beam.utils.pipeline_options import TypeOptions
 
+
 # Type variables
 T = typehints.TypeVariable('T')
 K = typehints.TypeVariable('K')
@@ -1207,9 +1209,45 @@ class Windowing(object):
                                           self.accumulation_mode,
                                           self.output_time_fn)
 
+  def __eq__(self, other):
+    if type(self) == type(other):
+      if self._is_default and other._is_default:
+        return True
+      else:
+        return (
+            self.windowfn == other.windowfn
+            and self.triggerfn == other.triggerfn
+            and self.accumulation_mode == other.accumulation_mode
+            and self.output_time_fn == other.output_time_fn)
+
   def is_default(self):
     return self._is_default
 
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.WindowingStrategy(
+        window_fn=self.windowfn.to_runner_api(context),
+        # TODO(robertwb): Prohibit implicit multi-level merging.
+        merge_status=(beam_runner_api_pb2.NEEDS_MERGE
+                      if self.windowfn.is_merging()
+                      else beam_runner_api_pb2.NON_MERGING),
+        window_coder_id=context.coders.get_id(
+            self.windowfn.get_window_coder()),
+        trigger=self.triggerfn.to_runner_api(context),
+        accumulation_mode=self.accumulation_mode,
+        output_time=self.output_time_fn,
+        closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
+        allowed_lateness=0)
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.transforms.trigger import TriggerFn
+    return Windowing(
+        windowfn=WindowFn.from_runner_api(proto.window_fn, context),
+        triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
+        accumulation_mode=proto.accumulation_mode,
+        output_time_fn=proto.output_time)
+
 
 @typehints.with_input_types(T)
 @typehints.with_output_types(T)

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 04198ba..b55d602 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -35,13 +35,14 @@ from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import OutputTimeFn
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
+from apache_beam.runners.api import beam_runner_api_pb2
 
 
 class AccumulationMode(object):
   """Controls what to do with data when a trigger fires multiple times.
   """
-  DISCARDING = 1
-  ACCUMULATING = 2
+  DISCARDING = beam_runner_api_pb2.DISCARDING
+  ACCUMULATING = beam_runner_api_pb2.ACCUMULATING
   # TODO(robertwb): Provide retractions of previous outputs.
   # RETRACTING = 3
 
@@ -185,6 +186,26 @@ class TriggerFn(object):
     pass
 # pylint: enable=unused-argument
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return {
+        'after_all': AfterAll,
+        'after_any': AfterFirst,
+        'after_each': AfterEach,
+        'after_end_of_widow': AfterWatermark,
+        # after_processing_time, after_synchronized_processing_time
+        # always
+        'default': DefaultTrigger,
+        'element_count': AfterCount,
+        # never
+        'or_finally': OrFinally,
+        'repeat': Repeatedly,
+    }[proto.WhichOneof('trigger')].from_runner_api(proto, context)
+
+  @abstractmethod
+  def to_runner_api(self, unused_context):
+    pass
+
 
 class DefaultTrigger(TriggerFn):
   """Semantically Repeatedly(AfterWatermark()), but more optimized."""
@@ -216,6 +237,14 @@ class DefaultTrigger(TriggerFn):
   def __eq__(self, other):
     return type(self) == type(other)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return DefaultTrigger()
+
+  def to_runner_api(self, unused_context):
+    return beam_runner_api_pb2.Trigger(
+        default=beam_runner_api_pb2.Trigger.Default())
+
 
 class AfterWatermark(TriggerFn):
   """Fire exactly once when the watermark passes the end of the window.
@@ -235,9 +264,9 @@ class AfterWatermark(TriggerFn):
   def __repr__(self):
     qualifiers = []
     if self.early:
-      qualifiers.append('early=%s' % self.early)
+      qualifiers.append('early=%s' % self.early.underlying)
     if self.late:
-      qualifiers.append('late=%s' % self.late)
+      qualifiers.append('late=%s' % self.late.underlying)
     return 'AfterWatermark(%s)' % ', '.join(qualifiers)
 
   def is_late(self, context):
@@ -305,6 +334,28 @@ class AfterWatermark(TriggerFn):
   def __hash__(self):
     return hash((type(self), self.early, self.late))
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return AfterWatermark(
+        early=TriggerFn.from_runner_api(
+            proto.after_end_of_widow.early_firings, context)
+        if proto.after_end_of_widow.HasField('early_firings')
+        else None,
+        late=TriggerFn.from_runner_api(
+            proto.after_end_of_widow.late_firings, context)
+        if proto.after_end_of_widow.HasField('late_firings')
+        else None)
+
+  def to_runner_api(self, context):
+    early_proto = self.early.underlying.to_runner_api(
+        context) if self.early else None
+    late_proto = self.late.underlying.to_runner_api(
+        context) if self.late else None
+    return beam_runner_api_pb2.Trigger(
+        after_end_of_widow=beam_runner_api_pb2.Trigger.AfterEndOfWindow(
+            early_firings=early_proto,
+            late_firings=late_proto))
+
 
 class AfterCount(TriggerFn):
   """Fire when there are at least count elements in this window pane."""
@@ -317,6 +368,9 @@ class AfterCount(TriggerFn):
   def __repr__(self):
     return 'AfterCount(%s)' % self.count
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.count == other.count
+
   def on_element(self, element, window, context):
     context.add_state(self.COUNT_TAG, 1)
 
@@ -333,6 +387,15 @@ class AfterCount(TriggerFn):
   def reset(self, window, context):
     context.clear_state(self.COUNT_TAG)
 
+  @staticmethod
+  def from_runner_api(proto, unused_context):
+    return AfterCount(proto.element_count.element_count)
+
+  def to_runner_api(self, unused_context):
+    return beam_runner_api_pb2.Trigger(
+        element_count=beam_runner_api_pb2.Trigger.ElementCount(
+            element_count=self.count))
+
 
 class Repeatedly(TriggerFn):
   """Repeatedly invoke the given trigger, never finishing."""
@@ -343,6 +406,9 @@ class Repeatedly(TriggerFn):
   def __repr__(self):
     return 'Repeatedly(%s)' % self.underlying
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.underlying == other.underlying
+
   def on_element(self, element, window, context):  # get window from context?
     self.underlying.on_element(element, window, context)
 
@@ -360,6 +426,16 @@ class Repeatedly(TriggerFn):
   def reset(self, window, context):
     self.underlying.reset(window, context)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return Repeatedly(
+        TriggerFn.from_runner_api(proto.repeat.subtrigger, context))
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.Trigger(
+        repeat=beam_runner_api_pb2.Trigger.Repeat(
+            subtrigger=self.underlying.to_runner_api(context)))
+
 
 class ParallelTriggerFn(TriggerFn):
 
@@ -372,6 +448,9 @@ class ParallelTriggerFn(TriggerFn):
     return '%s(%s)' % (self.__class__.__name__,
                        ', '.join(str(t) for t in self.triggers))
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.triggers == other.triggers
+
   @abstractmethod
   def combine_op(self, trigger_results):
     pass
@@ -406,6 +485,31 @@ class ParallelTriggerFn(TriggerFn):
   def _sub_context(context, index):
     return NestedContext(context, '%d/' % index)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    subtriggers = [
+        TriggerFn.from_runner_api(subtrigger, context)
+        for subtrigger
+        in proto.after_all.subtriggers or proto.after_any.subtriggers]
+    if proto.after_all.subtriggers:
+      return AfterAll(*subtriggers)
+    else:
+      return AfterFirst(*subtriggers)
+
+  def to_runner_api(self, context):
+    subtriggers = [
+        subtrigger.to_runner_api(context) for subtrigger in self.triggers]
+    if self.combine_op == all:
+      return beam_runner_api_pb2.Trigger(
+          after_all=beam_runner_api_pb2.Trigger.AfterAll(
+              subtriggers=subtriggers))
+    elif self.combine_op == any:
+      return beam_runner_api_pb2.Trigger(
+          after_any=beam_runner_api_pb2.Trigger.AfterAny(
+              subtriggers=subtriggers))
+    else:
+      raise NotImplementedError(self)
+
 
 class AfterFirst(ParallelTriggerFn):
   """Fires when any subtrigger fires.
@@ -435,6 +539,9 @@ class AfterEach(TriggerFn):
     return '%s(%s)' % (self.__class__.__name__,
                        ', '.join(str(t) for t in self.triggers))
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self.triggers == other.triggers
+
   def on_element(self, element, window, context):
     ix = context.get_state(self.INDEX_TAG)
     if ix < len(self.triggers):
@@ -474,12 +581,40 @@ class AfterEach(TriggerFn):
   def _sub_context(context, index):
     return NestedContext(context, '%d/' % index)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return AfterEach(*[
+        TriggerFn.from_runner_api(subtrigger, context)
+        for subtrigger in proto.after_each.subtriggers])
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.Trigger(
+        after_each=beam_runner_api_pb2.Trigger.AfterEach(
+            subtriggers=[
+                subtrigger.to_runner_api(context)
+                for subtrigger in self.triggers]))
+
 
 class OrFinally(AfterFirst):
 
   def __init__(self, body_trigger, exit_trigger):
     super(OrFinally, self).__init__(body_trigger, exit_trigger)
 
+  @staticmethod
+  def from_runner_api(proto, context):
+    return OrFinally(
+        TriggerFn.from_runner_api(proto.or_finally.main, context),
+        # getattr is used as finally is a keyword in Python
+        TriggerFn.from_runner_api(getattr(proto.or_finally, 'finally'),
+                                  context))
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.Trigger(
+        or_finally=beam_runner_api_pb2.Trigger.OrFinally(
+            main=self.triggers[0].to_runner_api(context),
+            # dict keyword argument is used as finally is a keyword in Python
+            **{'finally': self.triggers[1].to_runner_api(context)}))
+
 
 class TriggerContext(object):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 72bab2e..cc9e0f5 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -38,6 +38,7 @@ from apache_beam.transforms.trigger import DefaultTrigger
 from apache_beam.transforms.trigger import GeneralTriggerDriver
 from apache_beam.transforms.trigger import InMemoryUnmergedState
 from apache_beam.transforms.trigger import Repeatedly
+from apache_beam.transforms.trigger import TriggerFn
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import IntervalWindow
@@ -380,6 +381,38 @@ class TriggerTest(unittest.TestCase):
                        range(10))
 
 
+class RunnerApiTest(unittest.TestCase):
+
+  def test_trigger_encoding(self):
+    for trigger_fn in (
+        DefaultTrigger(),
+        AfterAll(AfterCount(1), AfterCount(10)),
+        AfterFirst(AfterCount(10), AfterCount(100)),
+        AfterWatermark(early=AfterCount(1000)),
+        AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+        Repeatedly(AfterCount(100)),
+        trigger.OrFinally(AfterCount(3), AfterCount(10))):
+      context = beam.pipeline.PipelineContext()
+      self.assertEqual(
+          trigger_fn,
+          TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+  def test_windowing_strategy_encoding(self):
+    for trigger_fn in (
+        DefaultTrigger(),
+        AfterAll(AfterCount(1), AfterCount(10)),
+        AfterFirst(AfterCount(10), AfterCount(100)),
+        AfterEach(AfterCount(100), AfterCount(1000)),
+        AfterWatermark(early=AfterCount(1000)),
+        AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+        Repeatedly(AfterCount(100)),
+        trigger.OrFinally(AfterCount(3), AfterCount(10))):
+      context = beam.pipeline.PipelineContext()
+      self.assertEqual(
+          trigger_fn,
+          TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+
 class TriggerPipelineTest(unittest.TestCase):
 
   def test_after_count(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a562bcf..c763a96 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -70,9 +70,9 @@ from apache_beam.utils import urns
 class OutputTimeFn(object):
   """Determines how output timestamps of grouping operations are assigned."""
 
-  OUTPUT_AT_EOW = 'OUTPUT_AT_EOW'
-  OUTPUT_AT_EARLIEST = 'OUTPUT_AT_EARLIEST'
-  OUTPUT_AT_LATEST = 'OUTPUT_AT_LATEST'
+  OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
+  OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
+  OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
   OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
 
   @staticmethod
@@ -116,6 +116,10 @@ class WindowFn(object):
     """Returns a window that is the result of merging a set of windows."""
     raise NotImplementedError
 
+  def is_merging(self):
+    """Returns whether this WindowFn merges windows."""
+    return True
+
   def get_window_coder(self):
     return coders.WindowCoder()
 
@@ -267,7 +271,16 @@ class GlobalWindow(BoundedWindow):
     return self is other or type(self) is type(other)
 
 
-class GlobalWindows(WindowFn):
+class NonMergingWindowFn(WindowFn):
+
+  def is_merging(self):
+    return False
+
+  def merge(self, merge_context):
+    pass  # No merging.
+
+
+class GlobalWindows(NonMergingWindowFn):
   """A windowing function that assigns everything to one global window."""
 
   @classmethod
@@ -277,9 +290,6 @@ class GlobalWindows(WindowFn):
   def assign(self, assign_context):
     return [GlobalWindow()]
 
-  def merge(self, merge_context):
-    pass  # No merging.
-
   def get_window_coder(self):
     return coders.GlobalWindowCoder()
 
@@ -304,7 +314,7 @@ WindowFn.register_urn(
     urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
 
 
-class FixedWindows(WindowFn):
+class FixedWindows(NonMergingWindowFn):
   """A windowing function that assigns each element to one time interval.
 
   The attributes size and offset determine in what time interval a timestamp
@@ -329,9 +339,6 @@ class FixedWindows(WindowFn):
     start = timestamp - (timestamp - self.offset) % self.size
     return [IntervalWindow(start, start + self.size)]
 
-  def merge(self, merge_context):
-    pass  # No merging.
-
   def __eq__(self, other):
     if type(self) == type(other) == FixedWindows:
       return self.size == other.size and self.offset == other.offset
@@ -356,7 +363,7 @@ WindowFn.register_urn(
     FixedWindows.from_runner_api_parameter)
 
 
-class SlidingWindows(WindowFn):
+class SlidingWindows(NonMergingWindowFn):
   """A windowing function that assigns each element to a set of sliding windows.
 
   The attributes size and offset determine in what time interval a timestamp
@@ -384,9 +391,6 @@ class SlidingWindows(WindowFn):
     return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
             for s in range(start, start - self.size, -self.period)]
 
-  def merge(self, merge_context):
-    pass  # No merging.
-
   def __eq__(self, other):
     if type(self) == type(other) == SlidingWindows:
       return (self.size == other.size

http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 821b143..c79739a 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -28,13 +28,17 @@ from apache_beam.transforms import Create
 from apache_beam.transforms import GroupByKey
 from apache_beam.transforms import Map
 from apache_beam.transforms import WindowInto
+from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.timeutil import MAX_TIMESTAMP
 from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import AfterCount
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms.window import OutputTimeFn
 from apache_beam.transforms.window import Sessions
 from apache_beam.transforms.window import SlidingWindows
 from apache_beam.transforms.window import TimestampedValue
@@ -226,7 +230,10 @@ class WindowTest(unittest.TestCase):
                 label='assert:mean')
     p.run()
 
-  def test_runner_api(self):
+
+class RunnerApiTest(unittest.TestCase):
+
+  def test_windowfn_encoding(self):
     for window_fn in (GlobalWindows(),
                       FixedWindows(37),
                       SlidingWindows(2, 389),
@@ -236,5 +243,19 @@ class WindowTest(unittest.TestCase):
           window_fn,
           WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
 
+  def test_windowing_encoding(self):
+    for windowing in (
+        Windowing(GlobalWindows()),
+        Windowing(FixedWindows(1, 3), AfterCount(6),
+                  accumulation_mode=AccumulationMode.ACCUMULATING),
+        Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
+                  output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
+                  accumulation_mode=AccumulationMode.DISCARDING)):
+      context = pipeline.PipelineContext()
+      self.assertEqual(
+          windowing,
+          Windowing.from_runner_api(windowing.to_runner_api(context), context))
+
+
 if __name__ == '__main__':
   unittest.main()


[4/9] beam git commit: Auto-generated runner api proto bindings.

Posted by ro...@apache.org.
Auto-generated runner api proto bindings.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bb125e1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bb125e1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bb125e1

Branch: refs/heads/master
Commit: 3bb125e12d625216c234fe396168843e6669c1e5
Parents: f13a84d
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 12:02:08 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:00 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/api/__init__.py |    0
 .../runners/api/beam_runner_api_pb2.py          | 2755 ++++++++++++++++++
 sdks/python/run_pylint.sh                       |    3 +-
 3 files changed, 2757 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/apache_beam/runners/api/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/__init__.py b/sdks/python/apache_beam/runners/api/__init__.py
new file mode 100644
index 0000000..e69de29


[9/9] beam git commit: Closes #2190

Posted by ro...@apache.org.
Closes #2190


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c2424cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c2424cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c2424cb

Branch: refs/heads/master
Commit: 2c2424cb44bb2976ea9099230106a639b5ee3993
Parents: f13a84d deff128
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 9 20:29:03 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:03 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        |  113 +
 sdks/python/apache_beam/runners/api/__init__.py |   16 +
 .../runners/api/beam_runner_api_pb2.py          | 2772 ++++++++++++++++++
 .../apache_beam/runners/pipeline_context.py     |   88 +
 .../runners/pipeline_context_test.py            |   49 +
 sdks/python/apache_beam/transforms/core.py      |   39 +
 sdks/python/apache_beam/transforms/trigger.py   |  143 +-
 .../apache_beam/transforms/trigger_test.py      |   19 +
 sdks/python/apache_beam/transforms/window.py    |  147 +-
 .../apache_beam/transforms/window_test.py       |   32 +
 sdks/python/apache_beam/utils/proto_utils.py    |   54 +
 sdks/python/apache_beam/utils/urns.py           |   24 +
 sdks/python/run_pylint.sh                       |    3 +-
 13 files changed, 3481 insertions(+), 18 deletions(-)
----------------------------------------------------------------------