You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/10 04:29:33 UTC
[1/9] beam git commit: Runner API context helper classes.
Repository: beam
Updated Branches:
refs/heads/master f13a84d67 -> 2c2424cb4
Runner API context helper classes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc76a186
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc76a186
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc76a186
Branch: refs/heads/master
Commit: bc76a186099568ef292ceb007388ae7174150bc2
Parents: 3bb125e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 12:04:27 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:00 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 62 ++++++++++++++++++++++++++++++++
1 file changed, 62 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bc76a186/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 7db39a9..4ec2e47 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,11 +52,14 @@ import os
import shutil
import tempfile
+from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.internal import pickler
from apache_beam.runners import create_runner
from apache_beam.runners import PipelineRunner
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
from apache_beam.transforms import ptransform
from apache_beam.typehints import TypeCheckError
from apache_beam.utils.pipeline_options import PipelineOptions
@@ -440,3 +443,62 @@ class AppliedPTransform(object):
if v not in visited:
visited.add(v)
visitor.visit_value(v, self)
+
+
+class PipelineContextMap(object):
+ """This is a bi-directional map between objects and ids.
+
+ Under the hood it encodes and decodes these objects into runner API
+ representations.
+ """
+ def __init__(self, context, obj_type, proto_map=None):
+ self._pipeline_context = context
+ self._obj_type = obj_type
+ self._obj_to_id = {}
+ self._id_to_obj = {}
+ self._id_to_proto = proto_map if proto_map else {}
+ self._counter = 0
+
+ def _unique_ref(self):
+ self._counter += 1
+ return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+ def populate_map(self, proto_map):
+ for id, obj in self._id_to_obj:
+ proto_map[id] = self._id_to_proto[id]
+
+ def get_id(self, obj):
+ if obj not in self._obj_to_id:
+ id = self._unique_ref()
+ self._id_to_obj[id] = obj
+ self._obj_to_id[obj] = id
+ self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+ return self._obj_to_id[obj]
+
+ def get_by_id(self, id):
+ if id not in self._id_to_obj:
+ self._id_to_obj[id] = self._obj_type.from_runner_api(
+ self._id_to_proto[id], self._pipeline_context)
+ return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+
+ _COMPONENT_TYPES = {
+ 'transforms': AppliedPTransform,
+ 'pcollections': pvalue.PCollection,
+ 'coders': coders.Coder,
+ 'windowing_strategies': core.Windowing,
+ # TODO: environment
+ }
+
+ def __init__(self, context_proto=None):
+ for name, cls in self._COMPONENT_TYPES.items():
+ setattr(self, name,
+ PipelineContextMap(self, cls, getattr(context_proto, name, None)))
+
+ def to_runner_api(self):
+ context_proto = beam_runner_api_pb2.Components()
+ for name, cls in self._COMPONENT_TYEPS:
+ getattr(self, name).populate_map(getattr(context_proto, name))
+ return context_proto
[7/9] beam git commit: Runner API encoding of WindowFns.
Posted by ro...@apache.org.
Runner API encoding of WindowFns.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aad32b7a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aad32b7a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aad32b7a
Branch: refs/heads/master
Commit: aad32b7a00d1aea1e7e51b68ff609d2fb3b82a8f
Parents: bc76a18
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 12:21:02 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/window.py | 117 +++++++++++++++++++
.../apache_beam/transforms/window_test.py | 11 ++
sdks/python/apache_beam/utils/proto_utils.py | 37 ++++++
sdks/python/apache_beam/utils/urns.py | 7 ++
4 files changed, 172 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 14cf2f6..a562bcf 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,13 +49,20 @@ WindowFn.
from __future__ import absolute_import
+from google.protobuf import struct_pb2
+from google.protobuf import wrappers_pb2
+
from apache_beam import coders
+from apache_beam.internal import pickler
+from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms import timeutil
from apache_beam.transforms.timeutil import Duration
from apache_beam.transforms.timeutil import MAX_TIMESTAMP
from apache_beam.transforms.timeutil import MIN_TIMESTAMP
from apache_beam.transforms.timeutil import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
# TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
@@ -131,6 +138,41 @@ class WindowFn(object):
# By default, just return the input timestamp.
return input_timestamp
+ _known_urns = {}
+
+ @classmethod
+ def register_urn(cls, urn, parameter_type, constructor):
+ cls._known_urns[urn] = parameter_type, constructor
+
+ @classmethod
+ def from_runner_api(cls, fn_proto, context):
+ parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+ return constructor(
+ proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+ context)
+
+ def to_runner_api(self, context):
+ urn, typed_param = self.to_runner_api_parameter(context)
+ return beam_runner_api_pb2.FunctionSpec(
+ spec=beam_runner_api_pb2.UrnWithParameter(
+ urn=urn,
+ parameter=proto_utils.pack_Any(typed_param)))
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return pickler.loads(fn_parameter.value)
+
+ def to_runner_api_parameter(self, context):
+ raise TypeError(self)
+ return (urns.PICKLED_WINDOW_FN,
+ wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+
+WindowFn.register_urn(
+ urns.PICKLED_WINDOW_FN,
+ wrappers_pb2.BytesValue,
+ WindowFn.from_runner_api_parameter)
+
class BoundedWindow(object):
"""A window for timestamps in range (-infinity, end).
@@ -251,6 +293,16 @@ class GlobalWindows(WindowFn):
def __ne__(self, other):
return not self == other
+ @staticmethod
+ def from_runner_api_parameter(unused_fn_parameter, unused_context):
+ return GlobalWindows()
+
+ def to_runner_api_parameter(self, context):
+ return urns.GLOBAL_WINDOWS_FN, None
+
+WindowFn.register_urn(
+ urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+
class FixedWindows(WindowFn):
"""A windowing function that assigns each element to one time interval.
@@ -280,6 +332,29 @@ class FixedWindows(WindowFn):
def merge(self, merge_context):
pass # No merging.
+ def __eq__(self, other):
+ if type(self) == type(other) == FixedWindows:
+ return self.size == other.size and self.offset == other.offset
+
+ def __ne__(self, other):
+ return not self == other
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return FixedWindows(
+ size=Duration(micros=fn_parameter['size']),
+ offset=Timestamp(micros=fn_parameter['offset']))
+
+ def to_runner_api_parameter(self, context):
+ return (urns.FIXED_WINDOWS_FN,
+ proto_utils.pack_Struct(size=self.size.micros,
+ offset=self.offset.micros))
+
+WindowFn.register_urn(
+ urns.FIXED_WINDOWS_FN,
+ struct_pb2.Struct,
+ FixedWindows.from_runner_api_parameter)
+
class SlidingWindows(WindowFn):
"""A windowing function that assigns each element to a set of sliding windows.
@@ -312,6 +387,31 @@ class SlidingWindows(WindowFn):
def merge(self, merge_context):
pass # No merging.
+ def __eq__(self, other):
+ if type(self) == type(other) == SlidingWindows:
+ return (self.size == other.size
+ and self.offset == other.offset
+ and self.period == other.period)
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return SlidingWindows(
+ size=Duration(micros=fn_parameter['size']),
+ offset=Timestamp(micros=fn_parameter['offset']),
+ period=Duration(micros=fn_parameter['period']))
+
+ def to_runner_api_parameter(self, context):
+ return (urns.SLIDING_WINDOWS_FN,
+ proto_utils.pack_Struct(
+ size=self.size.micros,
+ offset=self.offset.micros,
+ period=self.period.micros))
+
+WindowFn.register_urn(
+ urns.SLIDING_WINDOWS_FN,
+ struct_pb2.Struct,
+ SlidingWindows.from_runner_api_parameter)
+
class Sessions(WindowFn):
"""A windowing function that groups elements into sessions.
@@ -352,3 +452,20 @@ class Sessions(WindowFn):
end = w.end
if len(to_merge) > 1:
merge_context.merge(to_merge, IntervalWindow(to_merge[0].start, end))
+
+ def __eq__(self, other):
+ if type(self) == type(other) == Sessions:
+ return self.gap_size == other.gap_size
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
+
+ def to_runner_api_parameter(self, context):
+ return (urns.SESSION_WINDOWS_FN,
+ proto_utils.pack_Struct(gap_size=self.gap_size.micros))
+
+WindowFn.register_urn(
+ urns.SESSION_WINDOWS_FN,
+ struct_pb2.Struct,
+ Sessions.from_runner_api_parameter)
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index c4072ac..821b143 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,6 +19,7 @@
import unittest
+from apache_beam import pipeline
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import combiners
@@ -32,6 +33,7 @@ from apache_beam.transforms.timeutil import MIN_TIMESTAMP
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
@@ -224,6 +226,15 @@ class WindowTest(unittest.TestCase):
label='assert:mean')
p.run()
+ def test_runner_api(self):
+ for window_fn in (GlobalWindows(),
+ FixedWindows(37),
+ SlidingWindows(2, 389),
+ Sessions(5077)):
+ context = pipeline.PipelineContext()
+ self.assertEqual(
+ window_fn,
+ WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
new file mode 100644
index 0000000..0ece8f5
--- /dev/null
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -0,0 +1,37 @@
+from google.protobuf import any_pb2
+from google.protobuf import struct_pb2
+
+
+def pack_Any(msg):
+ """Creates a protobuf Any with msg as its content.
+
+ Returns None if msg is None.
+ """
+ if msg is None:
+ return None
+ else:
+ result = any_pb2.Any()
+ result.Pack(msg)
+ return result
+
+
+def unpack_Any(any_msg, msg_class):
+ """Unpacks any_msg into msg_class.
+
+ Returns None if msg_class is None.
+ """
+ if msg_class is None:
+ return None
+ else:
+ msg = msg_class()
+ any_msg.Unpack(msg)
+ return msg
+
+
+def pack_Struct(**kwargs):
+ """Returns a struct containing the values indicated by kwargs.
+ """
+ msg = struct_pb2.Struct()
+ for key, value in kwargs.items():
+ msg[key] = value # pylint: disable=unsubscriptable-object
+ return msg
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
new file mode 100644
index 0000000..4d1c2f7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -0,0 +1,7 @@
+PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
+GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
+FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
+SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
+SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
+
+PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"
[5/9] beam git commit: Add license to new files.
Posted by ro...@apache.org.
Add license to new files.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2da21e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2da21e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2da21e2
Branch: refs/heads/master
Commit: b2da21e287660bb3077bf89e092f7aa3c385906b
Parents: 5b86e1f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Mar 8 13:47:53 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/runners/api/__init__.py | 16 ++++++++++++++++
.../apache_beam/runners/api/beam_runner_api_pb2.py | 17 +++++++++++++++++
sdks/python/apache_beam/utils/proto_utils.py | 17 +++++++++++++++++
sdks/python/apache_beam/utils/urns.py | 17 +++++++++++++++++
4 files changed, 67 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/__init__.py b/sdks/python/apache_beam/runners/api/__init__.py
index e69de29..cce3aca 100644
--- a/sdks/python/apache_beam/runners/api/__init__.py
+++ b/sdks/python/apache_beam/runners/api/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
index 66c331b..f235ce8 100644
--- a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
+++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: beam_runner_api.proto
http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index 0ece8f5..b4bfdca 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
from google.protobuf import any_pb2
from google.protobuf import struct_pb2
http://git-wip-us.apache.org/repos/asf/beam/blob/b2da21e2/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 4d1c2f7..186c99c 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
[3/9] beam git commit: Auto-generated runner api proto bindings.
Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
new file mode 100644
index 0000000..66c331b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
@@ -0,0 +1,2755 @@
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: beam_runner_api.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf.internal import enum_type_wrapper
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf import descriptor_pb2
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+ name='beam_runner_api.proto',
+ package='org.apache.beam.runner_api.v1',
+ syntax='proto3',
+ serialized_pb=_b('\n\x15\x62\x65\x61m_runner_api.proto\x12\x1dorg.apache.beam.runner_api.v1\x1a\x19google/protobuf/any.proto\"\x8d\x07\n\nComponents\x12M\n\ntransforms\x18\x01 \x03(\x0b\x32\x39.org.apache.beam.runner_api.v1.Components.TransformsEntry\x12Q\n\x0cpcollections\x18\x02 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.PcollectionsEntry\x12`\n\x14windowing_strategies\x18\x03 \x03(\x0b\x32\x42.org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry\x12\x45\n\x06\x63oders\x18\x04 \x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.Components.CodersEntry\x12Q\n\x0c\x65nvironments\x18\x05 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.Components.EnvironmentsEntry\x1a\\\n\x0fTransformsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransform:\x02\x38\x01\x1a_\n\x11PcollectionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x39\n\x05value\x18\x02 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollection:\x02\
x38\x01\x1al\n\x18WindowingStrategiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12?\n\x05value\x18\x02 \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategy:\x02\x38\x01\x1aS\n\x0b\x43odersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.org.apache.beam.runner_api.v1.Coder:\x02\x38\x01\x1a_\n\x11\x45nvironmentsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x39\n\x05value\x18\x02 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.Environment:\x02\x38\x01\"\xe4\x06\n\x15MessageWithComponents\x12=\n\ncomponents\x18\x01 \x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x35\n\x05\x63oder\x18\x02 \x01(\x0b\x32$.org.apache.beam.runner_api.v1.CoderH\x00\x12H\n\x0f\x63ombine_payload\x18\x03 \x01(\x0b\x32-.org.apache.beam.runner_api.v1.CombinePayloadH\x00\x12\x44\n\rfunction_spec\x18\x04 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpecH\x00\x12\x45\n\x0epar_do_payload\x18\x06 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.ParDoPayloadH\x00\x12?\
n\nptransform\x18\x07 \x01(\x0b\x32).org.apache.beam.runner_api.v1.PTransformH\x00\x12\x41\n\x0bpcollection\x18\x08 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.PCollectionH\x00\x12\x42\n\x0cread_payload\x18\t \x01(\x0b\x32*.org.apache.beam.runner_api.v1.ReadPayloadH\x00\x12>\n\nside_input\x18\x0b \x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInputH\x00\x12O\n\x13window_into_payload\x18\x0c \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowIntoPayloadH\x00\x12N\n\x12windowing_strategy\x18\r \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.WindowingStrategyH\x00\x12M\n\x12urn_with_parameter\x18\x0e \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameterH\x00\x42\x06\n\x04root\"\xa6\x01\n\x08Pipeline\x12=\n\ncomponents\x18\x01 \x01(\x0b\x32).org.apache.beam.runner_api.v1.Components\x12\x19\n\x11root_transform_id\x18\x02 \x01(\t\x12@\n\x0c\x64isplay_data\x18\x03 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\"\xa8\x03\n\nPTransform\x12\x13\n\x0bunique_name\x18\x05
\x01(\t\x12=\n\x04spec\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12\x15\n\rsubtransforms\x18\x02 \x03(\t\x12\x45\n\x06inputs\x18\x03 \x03(\x0b\x32\x35.org.apache.beam.runner_api.v1.PTransform.InputsEntry\x12G\n\x07outputs\x18\x04 \x03(\x0b\x32\x36.org.apache.beam.runner_api.v1.PTransform.OutputsEntry\x12@\n\x0c\x64isplay_data\x18\x06 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\x1a-\n\x0bInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a.\n\x0cOutputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xd3\x01\n\x0bPCollection\x12\x13\n\x0bunique_name\x18\x01 \x01(\t\x12\x10\n\x08\x63oder_id\x18\x02 \x01(\t\x12<\n\nis_bounded\x18\x03 \x01(\x0e\x32(.org.apache.beam.runner_api.v1.IsBounded\x12\x1d\n\x15windowing_strategy_id\x18\x04 \x01(\t\x12@\n\x0c\x64isplay_data\x18\x05 \x01(\x0b\x32*.org.apache.beam.runner_api.v1.DisplayData\"\xb5\x03\n\x0cParDoPayload\x12:\n\x05
\x64o_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12<\n\nparameters\x18\x02 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.Parameter\x12P\n\x0bside_inputs\x18\x03 \x03(\x0b\x32;.org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry\x12=\n\x0bstate_specs\x18\x04 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.StateSpec\x12=\n\x0btimer_specs\x18\x05 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.TimerSpec\x1a[\n\x0fSideInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInput:\x02\x38\x01\"\x8b\x01\n\tParameter\x12;\n\x04type\x18\x01 \x01(\x0e\x32-.org.apache.beam.runner_api.v1.Parameter.Type\"A\n\x04Type\x12\n\n\x06WINDOW\x10\x00\x12\x14\n\x10PIPELINE_OPTIONS\x10\x01\x12\x17\n\x13RESTRICTION_TRACKER\x10\x02\"\x0b\n\tStateSpec\"\x0b\n\tTimerSpec\"\x88\x01\n\x0bReadPayload\x12;\n\x06source\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12<\n\nis_bounded\x18\x02 \x01(\x0e\x32
(.org.apache.beam.runner_api.v1.IsBounded\"S\n\x11WindowIntoPayload\x12>\n\twindow_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\"\xde\x02\n\x0e\x43ombinePayload\x12?\n\ncombine_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12\x1c\n\x14\x61\x63\x63umulator_coder_id\x18\x02 \x01(\t\x12<\n\nparameters\x18\x03 \x03(\x0b\x32(.org.apache.beam.runner_api.v1.Parameter\x12R\n\x0bside_inputs\x18\x04 \x03(\x0b\x32=.org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry\x1a[\n\x0fSideInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32(.org.apache.beam.runner_api.v1.SideInput:\x02\x38\x01\"_\n\x05\x43oder\x12\x39\n\x04spec\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12\x1b\n\x13\x63omponent_coder_ids\x18\x02 \x03(\t\"\xd7\x03\n\x11WindowingStrategy\x12>\n\twindow_fn\x18\x01 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12@\n\x0cmerge_status\x18\x02 \x01(\x0e\x32*.org.apache
.beam.runner_api.v1.MergeStatus\x12\x17\n\x0fwindow_coder_id\x18\x03 \x01(\t\x12\x37\n\x07trigger\x18\x04 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x12J\n\x11\x61\x63\x63umulation_mode\x18\x05 \x01(\x0e\x32/.org.apache.beam.runner_api.v1.AccumulationMode\x12>\n\x0boutput_time\x18\x06 \x01(\x0e\x32).org.apache.beam.runner_api.v1.OutputTime\x12H\n\x10\x63losing_behavior\x18\x07 \x01(\x0e\x32..org.apache.beam.runner_api.v1.ClosingBehavior\x12\x18\n\x10\x61llowed_lateness\x18\x08 \x01(\x03\"\xac\r\n\x07Trigger\x12\x44\n\tafter_all\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.Trigger.AfterAllH\x00\x12\x44\n\tafter_any\x18\x02 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.Trigger.AfterAnyH\x00\x12\x46\n\nafter_each\x18\x03 \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.Trigger.AfterEachH\x00\x12U\n\x12\x61\x66ter_end_of_widow\x18\x04 \x01(\x0b\x32\x37.org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindowH\x00\x12[\n\x15\x61\x66ter_processing_time\x18\x05 \x01(\x0b\x32:
.org.apache.beam.runner_api.v1.Trigger.AfterProcessingTimeH\x00\x12t\n\"after_synchronized_processing_time\x18\x06 \x01(\x0b\x32\x46.org.apache.beam.runner_api.v1.Trigger.AfterSynchronizedProcessingTimeH\x00\x12?\n\x06\x61lways\x18\x0c \x01(\x0b\x32-.org.apache.beam.runner_api.v1.Trigger.AlwaysH\x00\x12\x41\n\x07\x64\x65\x66\x61ult\x18\x07 \x01(\x0b\x32..org.apache.beam.runner_api.v1.Trigger.DefaultH\x00\x12L\n\relement_count\x18\x08 \x01(\x0b\x32\x33.org.apache.beam.runner_api.v1.Trigger.ElementCountH\x00\x12=\n\x05never\x18\t \x01(\x0b\x32,.org.apache.beam.runner_api.v1.Trigger.NeverH\x00\x12\x46\n\nor_finally\x18\n \x01(\x0b\x32\x30.org.apache.beam.runner_api.v1.Trigger.OrFinallyH\x00\x12?\n\x06repeat\x18\x0b \x01(\x0b\x32-.org.apache.beam.runner_api.v1.Trigger.RepeatH\x00\x1aG\n\x08\x41\x66terAll\x12;\n\x0bsubtriggers\x18\x01 \x03(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1aG\n\x08\x41\x66terAny\x12;\n\x0bsubtriggers\x18\x01 \x03(\x0b\x32&.org.apache.beam.runner_api.v1.Tr
igger\x1aH\n\tAfterEach\x12;\n\x0bsubtriggers\x18\x01 \x03(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1a\x8f\x01\n\x10\x41\x66terEndOfWindow\x12=\n\rearly_firings\x18\x01 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x12<\n\x0clate_firings\x18\x02 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1a\x66\n\x13\x41\x66terProcessingTime\x12O\n\x14timestamp_transforms\x18\x01 \x03(\x0b\x32\x31.org.apache.beam.runner_api.v1.TimestampTransform\x1a!\n\x1f\x41\x66terSynchronizedProcessingTime\x1a\t\n\x07\x44\x65\x66\x61ult\x1a%\n\x0c\x45lementCount\x12\x15\n\relement_count\x18\x01 \x01(\x05\x1a\x07\n\x05Never\x1a\x08\n\x06\x41lways\x1az\n\tOrFinally\x12\x34\n\x04main\x18\x01 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x12\x37\n\x07\x66inally\x18\x02 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.Trigger\x1a\x44\n\x06Repeat\x12:\n\nsubtrigger\x18\x01 \x01(\x0b\x32&.org.apache.beam.runner_api.v1.TriggerB\t\n\x07trigger\"\x8e\x02\n\x12TimestampTransform\x12H\n\x05\x64\x
65lay\x18\x01 \x01(\x0b\x32\x37.org.apache.beam.runner_api.v1.TimestampTransform.DelayH\x00\x12M\n\x08\x61lign_to\x18\x02 \x01(\x0b\x32\x39.org.apache.beam.runner_api.v1.TimestampTransform.AlignToH\x00\x1a\x1d\n\x05\x44\x65lay\x12\x14\n\x0c\x64\x65lay_millis\x18\x01 \x01(\x03\x1a)\n\x07\x41lignTo\x12\x0e\n\x06period\x18\x03 \x01(\x03\x12\x0e\n\x06offset\x18\x04 \x01(\x03\x42\x15\n\x13timestamp_transform\"\xda\x01\n\tSideInput\x12G\n\x0e\x61\x63\x63\x65ss_pattern\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12<\n\x07view_fn\x18\x02 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\x12\x46\n\x11window_mapping_fn\x18\x03 \x01(\x0b\x32+.org.apache.beam.runner_api.v1.FunctionSpec\"\x1a\n\x0b\x45nvironment\x12\x0b\n\x03url\x18\x01 \x01(\t\"e\n\x0c\x46unctionSpec\x12=\n\x04spec\x18\x01 \x01(\x0b\x32/.org.apache.beam.runner_api.v1.UrnWithParameter\x12\x16\n\x0e\x65nvironment_id\x18\x02 \x01(\t\"H\n\x10UrnWithParameter\x12\x0b\n\x03urn\x18\x01 \x01(\t\x12\'\
n\tparameter\x18\x02 \x01(\x0b\x32\x14.google.protobuf.Any\"\xf7\x03\n\x0b\x44isplayData\x12>\n\x05items\x18\x01 \x03(\x0b\x32/.org.apache.beam.runner_api.v1.DisplayData.Item\x1a\x46\n\nIdentifier\x12\x14\n\x0ctransform_id\x18\x01 \x01(\t\x12\x15\n\rtransform_urn\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\x1a\xf9\x01\n\x04Item\x12\x41\n\x02id\x18\x01 \x01(\x0b\x32\x35.org.apache.beam.runner_api.v1.DisplayData.Identifier\x12=\n\x04type\x18\x02 \x01(\x0e\x32/.org.apache.beam.runner_api.v1.DisplayData.Type\x12#\n\x05value\x18\x03 \x01(\x0b\x32\x14.google.protobuf.Any\x12)\n\x0bshort_value\x18\x04 \x01(\x0b\x32\x14.google.protobuf.Any\x12\r\n\x05label\x18\x05 \x01(\t\x12\x10\n\x08link_url\x18\x06 \x01(\t\"d\n\x04Type\x12\n\n\x06STRING\x10\x00\x12\x0b\n\x07INTEGER\x10\x01\x12\t\n\x05\x46LOAT\x10\x02\x12\x0b\n\x07\x42OOLEAN\x10\x03\x12\r\n\tTIMESTAMP\x10\x04\x12\x0c\n\x08\x44URATION\x10\x05\x12\x0e\n\nJAVA_CLASS\x10\x06*\'\n\tIsBounded\x12\x0b\n\x07\x42OUNDED\x10\x00\x12\r\n\tUNBOUN
DED\x10\x01*C\n\x0bMergeStatus\x12\x0f\n\x0bNON_MERGING\x10\x00\x12\x0f\n\x0bNEEDS_MERGE\x10\x01\x12\x12\n\x0e\x41LREADY_MERGED\x10\x02*4\n\x10\x41\x63\x63umulationMode\x12\x0e\n\nDISCARDING\x10\x00\x12\x10\n\x0c\x41\x43\x43UMULATING\x10\x01*8\n\x0f\x43losingBehavior\x12\x0f\n\x0b\x45MIT_ALWAYS\x10\x00\x12\x14\n\x10\x45MIT_IF_NONEMPTY\x10\x01*I\n\nOutputTime\x12\x11\n\rEND_OF_WINDOW\x10\x00\x12\x12\n\x0eLATEST_IN_PANE\x10\x01\x12\x14\n\x10\x45\x41RLIEST_IN_PANE\x10\x02*S\n\nTimeDomain\x12\x0e\n\nEVENT_TIME\x10\x00\x12\x13\n\x0fPROCESSING_TIME\x10\x01\x12 \n\x1cSYNCHRONIZED_PROCESSING_TIME\x10\x02\x42\x31\n$org.apache.beam.sdk.common.runner.v1B\tRunnerApib\x06proto3')
+ ,
+ dependencies=[google_dot_protobuf_dot_any__pb2.DESCRIPTOR,])
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+_ISBOUNDED = _descriptor.EnumDescriptor(
+ name='IsBounded',
+ full_name='org.apache.beam.runner_api.v1.IsBounded',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='BOUNDED', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='UNBOUNDED', index=1, number=1,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=7348,
+ serialized_end=7387,
+)
+_sym_db.RegisterEnumDescriptor(_ISBOUNDED)
+
+IsBounded = enum_type_wrapper.EnumTypeWrapper(_ISBOUNDED)
+_MERGESTATUS = _descriptor.EnumDescriptor(
+ name='MergeStatus',
+ full_name='org.apache.beam.runner_api.v1.MergeStatus',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='NON_MERGING', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='NEEDS_MERGE', index=1, number=1,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='ALREADY_MERGED', index=2, number=2,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=7389,
+ serialized_end=7456,
+)
+_sym_db.RegisterEnumDescriptor(_MERGESTATUS)
+
+MergeStatus = enum_type_wrapper.EnumTypeWrapper(_MERGESTATUS)
+_ACCUMULATIONMODE = _descriptor.EnumDescriptor(
+ name='AccumulationMode',
+ full_name='org.apache.beam.runner_api.v1.AccumulationMode',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='DISCARDING', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='ACCUMULATING', index=1, number=1,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=7458,
+ serialized_end=7510,
+)
+_sym_db.RegisterEnumDescriptor(_ACCUMULATIONMODE)
+
+AccumulationMode = enum_type_wrapper.EnumTypeWrapper(_ACCUMULATIONMODE)
+_CLOSINGBEHAVIOR = _descriptor.EnumDescriptor(
+ name='ClosingBehavior',
+ full_name='org.apache.beam.runner_api.v1.ClosingBehavior',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='EMIT_ALWAYS', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='EMIT_IF_NONEMPTY', index=1, number=1,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=7512,
+ serialized_end=7568,
+)
+_sym_db.RegisterEnumDescriptor(_CLOSINGBEHAVIOR)
+
+ClosingBehavior = enum_type_wrapper.EnumTypeWrapper(_CLOSINGBEHAVIOR)
+_OUTPUTTIME = _descriptor.EnumDescriptor(
+ name='OutputTime',
+ full_name='org.apache.beam.runner_api.v1.OutputTime',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='END_OF_WINDOW', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='LATEST_IN_PANE', index=1, number=1,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='EARLIEST_IN_PANE', index=2, number=2,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=7570,
+ serialized_end=7643,
+)
+_sym_db.RegisterEnumDescriptor(_OUTPUTTIME)
+
+OutputTime = enum_type_wrapper.EnumTypeWrapper(_OUTPUTTIME)
+_TIMEDOMAIN = _descriptor.EnumDescriptor(
+ name='TimeDomain',
+ full_name='org.apache.beam.runner_api.v1.TimeDomain',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='EVENT_TIME', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='PROCESSING_TIME', index=1, number=1,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='SYNCHRONIZED_PROCESSING_TIME', index=2, number=2,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=7645,
+ serialized_end=7728,
+)
+_sym_db.RegisterEnumDescriptor(_TIMEDOMAIN)
+
+TimeDomain = enum_type_wrapper.EnumTypeWrapper(_TIMEDOMAIN)
+BOUNDED = 0
+UNBOUNDED = 1
+NON_MERGING = 0
+NEEDS_MERGE = 1
+ALREADY_MERGED = 2
+DISCARDING = 0
+ACCUMULATING = 1
+EMIT_ALWAYS = 0
+EMIT_IF_NONEMPTY = 1
+END_OF_WINDOW = 0
+LATEST_IN_PANE = 1
+EARLIEST_IN_PANE = 2
+EVENT_TIME = 0
+PROCESSING_TIME = 1
+SYNCHRONIZED_PROCESSING_TIME = 2
+
+
+_PARAMETER_TYPE = _descriptor.EnumDescriptor(
+ name='Type',
+ full_name='org.apache.beam.runner_api.v1.Parameter.Type',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='WINDOW', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='PIPELINE_OPTIONS', index=1, number=1,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='RESTRICTION_TRACKER', index=2, number=2,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=3191,
+ serialized_end=3256,
+)
+_sym_db.RegisterEnumDescriptor(_PARAMETER_TYPE)
+
+_DISPLAYDATA_TYPE = _descriptor.EnumDescriptor(
+ name='Type',
+ full_name='org.apache.beam.runner_api.v1.DisplayData.Type',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='STRING', index=0, number=0,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='INTEGER', index=1, number=1,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='FLOAT', index=2, number=2,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='BOOLEAN', index=3, number=3,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='TIMESTAMP', index=4, number=4,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='DURATION', index=5, number=5,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='JAVA_CLASS', index=6, number=6,
+ options=None,
+ type=None),
+ ],
+ containing_type=None,
+ options=None,
+ serialized_start=7246,
+ serialized_end=7346,
+)
+_sym_db.RegisterEnumDescriptor(_DISPLAYDATA_TYPE)
+
+
+_COMPONENTS_TRANSFORMSENTRY = _descriptor.Descriptor(
+ name='TransformsEntry',
+ full_name='org.apache.beam.runner_api.v1.Components.TransformsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.Components.TransformsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.Components.TransformsEntry.value', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=512,
+ serialized_end=604,
+)
+
+_COMPONENTS_PCOLLECTIONSENTRY = _descriptor.Descriptor(
+ name='PcollectionsEntry',
+ full_name='org.apache.beam.runner_api.v1.Components.PcollectionsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.Components.PcollectionsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.Components.PcollectionsEntry.value', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=606,
+ serialized_end=701,
+)
+
+_COMPONENTS_WINDOWINGSTRATEGIESENTRY = _descriptor.Descriptor(
+ name='WindowingStrategiesEntry',
+ full_name='org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.Components.WindowingStrategiesEntry.value', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=703,
+ serialized_end=811,
+)
+
+_COMPONENTS_CODERSENTRY = _descriptor.Descriptor(
+ name='CodersEntry',
+ full_name='org.apache.beam.runner_api.v1.Components.CodersEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.Components.CodersEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.Components.CodersEntry.value', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=813,
+ serialized_end=896,
+)
+
+_COMPONENTS_ENVIRONMENTSENTRY = _descriptor.Descriptor(
+ name='EnvironmentsEntry',
+ full_name='org.apache.beam.runner_api.v1.Components.EnvironmentsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.Components.EnvironmentsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.Components.EnvironmentsEntry.value', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=898,
+ serialized_end=993,
+)
+
+_COMPONENTS = _descriptor.Descriptor(
+ name='Components',
+ full_name='org.apache.beam.runner_api.v1.Components',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='transforms', full_name='org.apache.beam.runner_api.v1.Components.transforms', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='pcollections', full_name='org.apache.beam.runner_api.v1.Components.pcollections', index=1,
+ number=2, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='windowing_strategies', full_name='org.apache.beam.runner_api.v1.Components.windowing_strategies', index=2,
+ number=3, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='coders', full_name='org.apache.beam.runner_api.v1.Components.coders', index=3,
+ number=4, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='environments', full_name='org.apache.beam.runner_api.v1.Components.environments', index=4,
+ number=5, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[_COMPONENTS_TRANSFORMSENTRY, _COMPONENTS_PCOLLECTIONSENTRY, _COMPONENTS_WINDOWINGSTRATEGIESENTRY, _COMPONENTS_CODERSENTRY, _COMPONENTS_ENVIRONMENTSENTRY, ],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=84,
+ serialized_end=993,
+)
+
+
+_MESSAGEWITHCOMPONENTS = _descriptor.Descriptor(
+ name='MessageWithComponents',
+ full_name='org.apache.beam.runner_api.v1.MessageWithComponents',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='components', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.components', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='coder', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.coder', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='combine_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.combine_payload', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='function_spec', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.function_spec', index=3,
+ number=4, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='par_do_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.par_do_payload', index=4,
+ number=6, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='ptransform', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.ptransform', index=5,
+ number=7, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='pcollection', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.pcollection', index=6,
+ number=8, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='read_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.read_payload', index=7,
+ number=9, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='side_input', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.side_input', index=8,
+ number=11, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='window_into_payload', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.window_into_payload', index=9,
+ number=12, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='windowing_strategy', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.windowing_strategy', index=10,
+ number=13, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='urn_with_parameter', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.urn_with_parameter', index=11,
+ number=14, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ _descriptor.OneofDescriptor(
+ name='root', full_name='org.apache.beam.runner_api.v1.MessageWithComponents.root',
+ index=0, containing_type=None, fields=[]),
+ ],
+ serialized_start=996,
+ serialized_end=1864,
+)
+
+
+_PIPELINE = _descriptor.Descriptor(
+ name='Pipeline',
+ full_name='org.apache.beam.runner_api.v1.Pipeline',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='components', full_name='org.apache.beam.runner_api.v1.Pipeline.components', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='root_transform_id', full_name='org.apache.beam.runner_api.v1.Pipeline.root_transform_id', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='display_data', full_name='org.apache.beam.runner_api.v1.Pipeline.display_data', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1867,
+ serialized_end=2033,
+)
+
+
+_PTRANSFORM_INPUTSENTRY = _descriptor.Descriptor(
+ name='InputsEntry',
+ full_name='org.apache.beam.runner_api.v1.PTransform.InputsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.PTransform.InputsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.PTransform.InputsEntry.value', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2367,
+ serialized_end=2412,
+)
+
+_PTRANSFORM_OUTPUTSENTRY = _descriptor.Descriptor(
+ name='OutputsEntry',
+ full_name='org.apache.beam.runner_api.v1.PTransform.OutputsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.PTransform.OutputsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.PTransform.OutputsEntry.value', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2414,
+ serialized_end=2460,
+)
+
+_PTRANSFORM = _descriptor.Descriptor(
+ name='PTransform',
+ full_name='org.apache.beam.runner_api.v1.PTransform',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='unique_name', full_name='org.apache.beam.runner_api.v1.PTransform.unique_name', index=0,
+ number=5, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='spec', full_name='org.apache.beam.runner_api.v1.PTransform.spec', index=1,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='subtransforms', full_name='org.apache.beam.runner_api.v1.PTransform.subtransforms', index=2,
+ number=2, type=9, cpp_type=9, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='inputs', full_name='org.apache.beam.runner_api.v1.PTransform.inputs', index=3,
+ number=3, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='outputs', full_name='org.apache.beam.runner_api.v1.PTransform.outputs', index=4,
+ number=4, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='display_data', full_name='org.apache.beam.runner_api.v1.PTransform.display_data', index=5,
+ number=6, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[_PTRANSFORM_INPUTSENTRY, _PTRANSFORM_OUTPUTSENTRY, ],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2036,
+ serialized_end=2460,
+)
+
+
+_PCOLLECTION = _descriptor.Descriptor(
+ name='PCollection',
+ full_name='org.apache.beam.runner_api.v1.PCollection',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='unique_name', full_name='org.apache.beam.runner_api.v1.PCollection.unique_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='coder_id', full_name='org.apache.beam.runner_api.v1.PCollection.coder_id', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='is_bounded', full_name='org.apache.beam.runner_api.v1.PCollection.is_bounded', index=2,
+ number=3, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='windowing_strategy_id', full_name='org.apache.beam.runner_api.v1.PCollection.windowing_strategy_id', index=3,
+ number=4, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='display_data', full_name='org.apache.beam.runner_api.v1.PCollection.display_data', index=4,
+ number=5, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2463,
+ serialized_end=2674,
+)
+
+
+_PARDOPAYLOAD_SIDEINPUTSENTRY = _descriptor.Descriptor(
+ name='SideInputsEntry',
+ full_name='org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.ParDoPayload.SideInputsEntry.value', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3023,
+ serialized_end=3114,
+)
+
+_PARDOPAYLOAD = _descriptor.Descriptor(
+ name='ParDoPayload',
+ full_name='org.apache.beam.runner_api.v1.ParDoPayload',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='do_fn', full_name='org.apache.beam.runner_api.v1.ParDoPayload.do_fn', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='parameters', full_name='org.apache.beam.runner_api.v1.ParDoPayload.parameters', index=1,
+ number=2, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='side_inputs', full_name='org.apache.beam.runner_api.v1.ParDoPayload.side_inputs', index=2,
+ number=3, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='state_specs', full_name='org.apache.beam.runner_api.v1.ParDoPayload.state_specs', index=3,
+ number=4, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='timer_specs', full_name='org.apache.beam.runner_api.v1.ParDoPayload.timer_specs', index=4,
+ number=5, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[_PARDOPAYLOAD_SIDEINPUTSENTRY, ],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2677,
+ serialized_end=3114,
+)
+
+
+_PARAMETER = _descriptor.Descriptor(
+ name='Parameter',
+ full_name='org.apache.beam.runner_api.v1.Parameter',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='type', full_name='org.apache.beam.runner_api.v1.Parameter.type', index=0,
+ number=1, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ _PARAMETER_TYPE,
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3117,
+ serialized_end=3256,
+)
+
+
+_STATESPEC = _descriptor.Descriptor(
+ name='StateSpec',
+ full_name='org.apache.beam.runner_api.v1.StateSpec',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3258,
+ serialized_end=3269,
+)
+
+
+_TIMERSPEC = _descriptor.Descriptor(
+ name='TimerSpec',
+ full_name='org.apache.beam.runner_api.v1.TimerSpec',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3271,
+ serialized_end=3282,
+)
+
+
+_READPAYLOAD = _descriptor.Descriptor(
+ name='ReadPayload',
+ full_name='org.apache.beam.runner_api.v1.ReadPayload',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='source', full_name='org.apache.beam.runner_api.v1.ReadPayload.source', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='is_bounded', full_name='org.apache.beam.runner_api.v1.ReadPayload.is_bounded', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3285,
+ serialized_end=3421,
+)
+
+
+_WINDOWINTOPAYLOAD = _descriptor.Descriptor(
+ name='WindowIntoPayload',
+ full_name='org.apache.beam.runner_api.v1.WindowIntoPayload',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='window_fn', full_name='org.apache.beam.runner_api.v1.WindowIntoPayload.window_fn', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3423,
+ serialized_end=3506,
+)
+
+
+_COMBINEPAYLOAD_SIDEINPUTSENTRY = _descriptor.Descriptor(
+ name='SideInputsEntry',
+ full_name='org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.CombinePayload.SideInputsEntry.value', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3023,
+ serialized_end=3114,
+)
+
+_COMBINEPAYLOAD = _descriptor.Descriptor(
+ name='CombinePayload',
+ full_name='org.apache.beam.runner_api.v1.CombinePayload',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='combine_fn', full_name='org.apache.beam.runner_api.v1.CombinePayload.combine_fn', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='accumulator_coder_id', full_name='org.apache.beam.runner_api.v1.CombinePayload.accumulator_coder_id', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='parameters', full_name='org.apache.beam.runner_api.v1.CombinePayload.parameters', index=2,
+ number=3, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='side_inputs', full_name='org.apache.beam.runner_api.v1.CombinePayload.side_inputs', index=3,
+ number=4, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[_COMBINEPAYLOAD_SIDEINPUTSENTRY, ],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3509,
+ serialized_end=3859,
+)
+
+
+_CODER = _descriptor.Descriptor(
+ name='Coder',
+ full_name='org.apache.beam.runner_api.v1.Coder',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='spec', full_name='org.apache.beam.runner_api.v1.Coder.spec', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='component_coder_ids', full_name='org.apache.beam.runner_api.v1.Coder.component_coder_ids', index=1,
+ number=2, type=9, cpp_type=9, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3861,
+ serialized_end=3956,
+)
+
+
+_WINDOWINGSTRATEGY = _descriptor.Descriptor(
+ name='WindowingStrategy',
+ full_name='org.apache.beam.runner_api.v1.WindowingStrategy',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='window_fn', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.window_fn', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='merge_status', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.merge_status', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='window_coder_id', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.window_coder_id', index=2,
+ number=3, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='trigger', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.trigger', index=3,
+ number=4, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='accumulation_mode', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.accumulation_mode', index=4,
+ number=5, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='output_time', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.output_time', index=5,
+ number=6, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='closing_behavior', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.closing_behavior', index=6,
+ number=7, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='allowed_lateness', full_name='org.apache.beam.runner_api.v1.WindowingStrategy.allowed_lateness', index=7,
+ number=8, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=3959,
+ serialized_end=4430,
+)
+
+
+_TRIGGER_AFTERALL = _descriptor.Descriptor(
+ name='AfterAll',
+ full_name='org.apache.beam.runner_api.v1.Trigger.AfterAll',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='subtriggers', full_name='org.apache.beam.runner_api.v1.Trigger.AfterAll.subtriggers', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5364,
+ serialized_end=5435,
+)
+
+_TRIGGER_AFTERANY = _descriptor.Descriptor(
+ name='AfterAny',
+ full_name='org.apache.beam.runner_api.v1.Trigger.AfterAny',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='subtriggers', full_name='org.apache.beam.runner_api.v1.Trigger.AfterAny.subtriggers', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5437,
+ serialized_end=5508,
+)
+
+_TRIGGER_AFTEREACH = _descriptor.Descriptor(
+ name='AfterEach',
+ full_name='org.apache.beam.runner_api.v1.Trigger.AfterEach',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='subtriggers', full_name='org.apache.beam.runner_api.v1.Trigger.AfterEach.subtriggers', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5510,
+ serialized_end=5582,
+)
+
+_TRIGGER_AFTERENDOFWINDOW = _descriptor.Descriptor(
+ name='AfterEndOfWindow',
+ full_name='org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindow',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='early_firings', full_name='org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindow.early_firings', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='late_firings', full_name='org.apache.beam.runner_api.v1.Trigger.AfterEndOfWindow.late_firings', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5585,
+ serialized_end=5728,
+)
+
+_TRIGGER_AFTERPROCESSINGTIME = _descriptor.Descriptor(
+ name='AfterProcessingTime',
+ full_name='org.apache.beam.runner_api.v1.Trigger.AfterProcessingTime',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='timestamp_transforms', full_name='org.apache.beam.runner_api.v1.Trigger.AfterProcessingTime.timestamp_transforms', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5730,
+ serialized_end=5832,
+)
+
+_TRIGGER_AFTERSYNCHRONIZEDPROCESSINGTIME = _descriptor.Descriptor(
+ name='AfterSynchronizedProcessingTime',
+ full_name='org.apache.beam.runner_api.v1.Trigger.AfterSynchronizedProcessingTime',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5834,
+ serialized_end=5867,
+)
+
+_TRIGGER_DEFAULT = _descriptor.Descriptor(
+ name='Default',
+ full_name='org.apache.beam.runner_api.v1.Trigger.Default',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5869,
+ serialized_end=5878,
+)
+
+_TRIGGER_ELEMENTCOUNT = _descriptor.Descriptor(
+ name='ElementCount',
+ full_name='org.apache.beam.runner_api.v1.Trigger.ElementCount',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='element_count', full_name='org.apache.beam.runner_api.v1.Trigger.ElementCount.element_count', index=0,
+ number=1, type=5, cpp_type=1, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5880,
+ serialized_end=5917,
+)
+
+_TRIGGER_NEVER = _descriptor.Descriptor(
+ name='Never',
+ full_name='org.apache.beam.runner_api.v1.Trigger.Never',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5919,
+ serialized_end=5926,
+)
+
+_TRIGGER_ALWAYS = _descriptor.Descriptor(
+ name='Always',
+ full_name='org.apache.beam.runner_api.v1.Trigger.Always',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5928,
+ serialized_end=5936,
+)
+
+_TRIGGER_ORFINALLY = _descriptor.Descriptor(
+ name='OrFinally',
+ full_name='org.apache.beam.runner_api.v1.Trigger.OrFinally',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='main', full_name='org.apache.beam.runner_api.v1.Trigger.OrFinally.main', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='finally', full_name='org.apache.beam.runner_api.v1.Trigger.OrFinally.finally', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=5938,
+ serialized_end=6060,
+)
+
+_TRIGGER_REPEAT = _descriptor.Descriptor(
+ name='Repeat',
+ full_name='org.apache.beam.runner_api.v1.Trigger.Repeat',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='subtrigger', full_name='org.apache.beam.runner_api.v1.Trigger.Repeat.subtrigger', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6062,
+ serialized_end=6130,
+)
+
+_TRIGGER = _descriptor.Descriptor(
+ name='Trigger',
+ full_name='org.apache.beam.runner_api.v1.Trigger',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='after_all', full_name='org.apache.beam.runner_api.v1.Trigger.after_all', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='after_any', full_name='org.apache.beam.runner_api.v1.Trigger.after_any', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='after_each', full_name='org.apache.beam.runner_api.v1.Trigger.after_each', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='after_end_of_widow', full_name='org.apache.beam.runner_api.v1.Trigger.after_end_of_widow', index=3,
+ number=4, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='after_processing_time', full_name='org.apache.beam.runner_api.v1.Trigger.after_processing_time', index=4,
+ number=5, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='after_synchronized_processing_time', full_name='org.apache.beam.runner_api.v1.Trigger.after_synchronized_processing_time', index=5,
+ number=6, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='always', full_name='org.apache.beam.runner_api.v1.Trigger.always', index=6,
+ number=12, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='default', full_name='org.apache.beam.runner_api.v1.Trigger.default', index=7,
+ number=7, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='element_count', full_name='org.apache.beam.runner_api.v1.Trigger.element_count', index=8,
+ number=8, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='never', full_name='org.apache.beam.runner_api.v1.Trigger.never', index=9,
+ number=9, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='or_finally', full_name='org.apache.beam.runner_api.v1.Trigger.or_finally', index=10,
+ number=10, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='repeat', full_name='org.apache.beam.runner_api.v1.Trigger.repeat', index=11,
+ number=11, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[_TRIGGER_AFTERALL, _TRIGGER_AFTERANY, _TRIGGER_AFTEREACH, _TRIGGER_AFTERENDOFWINDOW, _TRIGGER_AFTERPROCESSINGTIME, _TRIGGER_AFTERSYNCHRONIZEDPROCESSINGTIME, _TRIGGER_DEFAULT, _TRIGGER_ELEMENTCOUNT, _TRIGGER_NEVER, _TRIGGER_ALWAYS, _TRIGGER_ORFINALLY, _TRIGGER_REPEAT, ],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ _descriptor.OneofDescriptor(
+ name='trigger', full_name='org.apache.beam.runner_api.v1.Trigger.trigger',
+ index=0, containing_type=None, fields=[]),
+ ],
+ serialized_start=4433,
+ serialized_end=6141,
+)
+
+
+_TIMESTAMPTRANSFORM_DELAY = _descriptor.Descriptor(
+ name='Delay',
+ full_name='org.apache.beam.runner_api.v1.TimestampTransform.Delay',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='delay_millis', full_name='org.apache.beam.runner_api.v1.TimestampTransform.Delay.delay_millis', index=0,
+ number=1, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6319,
+ serialized_end=6348,
+)
+
+_TIMESTAMPTRANSFORM_ALIGNTO = _descriptor.Descriptor(
+ name='AlignTo',
+ full_name='org.apache.beam.runner_api.v1.TimestampTransform.AlignTo',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='period', full_name='org.apache.beam.runner_api.v1.TimestampTransform.AlignTo.period', index=0,
+ number=3, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='offset', full_name='org.apache.beam.runner_api.v1.TimestampTransform.AlignTo.offset', index=1,
+ number=4, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6350,
+ serialized_end=6391,
+)
+
+_TIMESTAMPTRANSFORM = _descriptor.Descriptor(
+ name='TimestampTransform',
+ full_name='org.apache.beam.runner_api.v1.TimestampTransform',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='delay', full_name='org.apache.beam.runner_api.v1.TimestampTransform.delay', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='align_to', full_name='org.apache.beam.runner_api.v1.TimestampTransform.align_to', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[_TIMESTAMPTRANSFORM_DELAY, _TIMESTAMPTRANSFORM_ALIGNTO, ],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ _descriptor.OneofDescriptor(
+ name='timestamp_transform', full_name='org.apache.beam.runner_api.v1.TimestampTransform.timestamp_transform',
+ index=0, containing_type=None, fields=[]),
+ ],
+ serialized_start=6144,
+ serialized_end=6414,
+)
+
+
+_SIDEINPUT = _descriptor.Descriptor(
+ name='SideInput',
+ full_name='org.apache.beam.runner_api.v1.SideInput',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='access_pattern', full_name='org.apache.beam.runner_api.v1.SideInput.access_pattern', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='view_fn', full_name='org.apache.beam.runner_api.v1.SideInput.view_fn', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='window_mapping_fn', full_name='org.apache.beam.runner_api.v1.SideInput.window_mapping_fn', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6417,
+ serialized_end=6635,
+)
+
+
+_ENVIRONMENT = _descriptor.Descriptor(
+ name='Environment',
+ full_name='org.apache.beam.runner_api.v1.Environment',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='url', full_name='org.apache.beam.runner_api.v1.Environment.url', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6637,
+ serialized_end=6663,
+)
+
+
+_FUNCTIONSPEC = _descriptor.Descriptor(
+ name='FunctionSpec',
+ full_name='org.apache.beam.runner_api.v1.FunctionSpec',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='spec', full_name='org.apache.beam.runner_api.v1.FunctionSpec.spec', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='environment_id', full_name='org.apache.beam.runner_api.v1.FunctionSpec.environment_id', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6665,
+ serialized_end=6766,
+)
+
+
+_URNWITHPARAMETER = _descriptor.Descriptor(
+ name='UrnWithParameter',
+ full_name='org.apache.beam.runner_api.v1.UrnWithParameter',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='urn', full_name='org.apache.beam.runner_api.v1.UrnWithParameter.urn', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='parameter', full_name='org.apache.beam.runner_api.v1.UrnWithParameter.parameter', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6768,
+ serialized_end=6840,
+)
+
+
+_DISPLAYDATA_IDENTIFIER = _descriptor.Descriptor(
+ name='Identifier',
+ full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='transform_id', full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier.transform_id', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='transform_urn', full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier.transform_urn', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='key', full_name='org.apache.beam.runner_api.v1.DisplayData.Identifier.key', index=2,
+ number=3, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6922,
+ serialized_end=6992,
+)
+
+_DISPLAYDATA_ITEM = _descriptor.Descriptor(
+ name='Item',
+ full_name='org.apache.beam.runner_api.v1.DisplayData.Item',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='id', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.id', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='type', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.type', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.value', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='short_value', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.short_value', index=3,
+ number=4, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='label', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.label', index=4,
+ number=5, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ _descriptor.FieldDescriptor(
+ name='link_url', full_name='org.apache.beam.runner_api.v1.DisplayData.Item.link_url', index=5,
+ number=6, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6995,
+ serialized_end=7244,
+)
+
+_DISPLAYDATA = _descriptor.Descriptor(
+ name='DisplayData',
+ full_name='org.apache.beam.runner_api.v1.DisplayData',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='items', full_name='org.apache.beam.runner_api.v1.DisplayData.items', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[_DISPLAYDATA_IDENTIFIER, _DISPLAYDATA_ITEM, ],
+ enum_types=[
+ _DISPLAYDATA_TYPE,
+ ],
+ options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=6843,
+ serialized_end=7346,
+)
+
+_COMPONENTS_TRANSFORMSENTRY.fields_by_name['value'].message_type = _PTRANSFORM
+_COMPONENTS_TRANSFORMSENTRY.containing_type = _COMPONENTS
+_COMPONENTS_PCOLLECTIONSENTRY.fields_by_name['value'].message_type = _PCOLLECTION
+_COMPONENTS_PCOLLECTIONSENTRY.containing_type = _COMPONENTS
+_COMPONENTS_WINDOWINGSTRATEGIESENTRY.fields_by_name['value'].message_type = _WINDOWINGSTRATEGY
+_COMPONENTS_WINDOWINGSTRATEGIESENTRY.containing_type = _COMPONENTS
+_COMPONENTS_CODERSENTRY.fields_by_name['value'].message_type = _CODER
+_COMPONENTS_CODERSENTRY.containing_type = _COMPONENTS
+_COMPONENTS_ENVIRONMENTSENTRY.fields_by_name['value'].message_type = _ENVIRONMENT
+_COMPONENTS_ENVIRONMENTSENTRY.containing_type = _COMPONENTS
+_COMPONENTS.fields_by_name['transforms'].message_type = _COMPONENTS_TRANSFORMSENTRY
+_COMPONENTS.fields_by_name['pcollections'].message_type = _COMPONENTS_PCOLLECTIONSENTRY
+_COMPONENTS.fields_by_name['windowing_strategies'].message_type = _COMPONENTS_WINDOWINGSTRATEGIESENTRY
+_COMPONENTS.fields_by_name['coders'].message_type = _COMPONENTS_CODERSENTRY
+_COMPONENTS.fields_by_name['environments'].message_type = _COMPONENTS_ENVIRONMENTSENTRY
+_MESSAGEWITHCOMPONENTS.fields_by_name['components'].message_type = _COMPONENTS
+_MESSAGEWITHCOMPONENTS.fields_by_name['coder'].message_type = _CODER
+_MESSAGEWITHCOMPONENTS.fields_by_name['combine_payload'].message_type = _COMBINEPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['function_spec'].message_type = _FUNCTIONSPEC
+_MESSAGEWITHCOMPONENTS.fields_by_name['par_do_payload'].message_type = _PARDOPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['ptransform'].message_type = _PTRANSFORM
+_MESSAGEWITHCOMPONENTS.fields_by_name['pcollection'].message_type = _PCOLLECTION
+_MESSAGEWITHCOMPONENTS.fields_by_name['read_payload'].message_type = _READPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['side_input'].message_type = _SIDEINPUT
+_MESSAGEWITHCOMPONENTS.fields_by_name['window_into_payload'].message_type = _WINDOWINTOPAYLOAD
+_MESSAGEWITHCOMPONENTS.fields_by_name['windowing_strategy'].message_type = _WINDOWINGSTRATEGY
+_MESSAGEWITHCOMPONENTS.fields_by_name['urn_with_parameter'].message_type = _URNWITHPARAMETER
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+ _MESSAGEWITHCOMPONENTS.fields_by_name['coder'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['coder'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+ _MESSAGEWITHCOMPONENTS.fields_by_name['combine_payload'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['combine_payload'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+ _MESSAGEWITHCOMPONENTS.fields_by_name['function_spec'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['function_spec'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+ _MESSAGEWITHCOMPONENTS.fields_by_name['par_do_payload'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['par_do_payload'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+ _MESSAGEWITHCOMPONENTS.fields_by_name['ptransform'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['ptransform'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+ _MESSAGEWITHCOMPONENTS.fields_by_name['pcollection'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['pcollection'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].fields.append(
+ _MESSAGEWITHCOMPONENTS.fields_by_name['read_payload'])
+_MESSAGEWITHCOMPONENTS.fields_by_name['read_payload'].containing_oneof = _MESSAGEWITHCOMPONENTS.oneofs_by_name['root']
+_MESSAGEWITHCOMPONENTS.oneofs_by_name['root'].f
<TRUNCATED>
[2/9] beam git commit: Auto-generated runner api proto bindings.
Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index afc5fb4..5e63856 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -34,7 +34,8 @@ EXCLUDED_GENERATED_FILES=(
"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py"
"apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py"
"apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py"
-"apache_beam/coders/proto2_coder_test_messages_pb2.py")
+"apache_beam/coders/proto2_coder_test_messages_pb2.py"
+"apache_beam/runners/api/beam_runner_api_pb2.py")
FILES_TO_IGNORE=""
for file in "${EXCLUDED_GENERATED_FILES[@]}"; do
[8/9] beam git commit: Move pipeline context and add more tests.
Posted by ro...@apache.org.
Move pipeline context and add more tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/deff128f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/deff128f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/deff128f
Branch: refs/heads/master
Commit: deff128ff07ccc67956e1d5a94a64f2a31b224c8
Parents: b2da21e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 9 09:21:33 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:02 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 93 ++++++++++++++++++++
sdks/python/apache_beam/pipeline.py | 62 -------------
.../apache_beam/runners/pipeline_context.py | 88 ++++++++++++++++++
.../runners/pipeline_context_test.py | 49 +++++++++++
sdks/python/apache_beam/transforms/core.py | 1 +
.../apache_beam/transforms/trigger_test.py | 18 +---
sdks/python/apache_beam/transforms/window.py | 2 +-
.../apache_beam/transforms/window_test.py | 6 +-
sdks/python/apache_beam/utils/urns.py | 2 +-
9 files changed, 238 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index fd72af8..9f5a97a 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -266,6 +266,12 @@ class BytesCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class VarIntCoder(FastCoder):
"""Variable-length integer coder."""
@@ -276,6 +282,12 @@ class VarIntCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class FloatCoder(FastCoder):
"""A coder used for floating-point values."""
@@ -286,6 +298,12 @@ class FloatCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class TimestampCoder(FastCoder):
"""A coder used for timeutil.Timestamp values."""
@@ -296,6 +314,12 @@ class TimestampCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class SingletonCoder(FastCoder):
"""A coder that always encodes exactly one value."""
@@ -309,6 +333,12 @@ class SingletonCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other) and self._value == other._value
+
+ def __hash__(self):
+ return hash(self._value)
+
def maybe_dill_dumps(o):
"""Pickle using cPickle or the Dill pickler as a fallback."""
@@ -365,6 +395,12 @@ class _PickleCoderBase(FastCoder):
def value_coder(self):
return self
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class PickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality."""
@@ -446,6 +482,12 @@ class FastPrimitivesCoder(FastCoder):
def value_coder(self):
return self
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class Base64PickleCoder(Coder):
"""Coder of objects by Python pickle, then base64 encoding."""
@@ -503,6 +545,13 @@ class ProtoCoder(FastCoder):
# a Map.
return False
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self.proto_message_type == other.proto_message_type)
+
+ def __hash__(self):
+ return hash(self.proto_message_type)
+
@staticmethod
def from_type_hint(typehint, unused_registry):
if issubclass(typehint, google.protobuf.message.Message):
@@ -563,6 +612,13 @@ class TupleCoder(FastCoder):
def __repr__(self):
return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._coders == self._coders)
+
+ def __hash__(self):
+ return hash(self._coders)
+
class TupleSequenceCoder(FastCoder):
"""Coder of homogeneous tuple objects."""
@@ -586,6 +642,13 @@ class TupleSequenceCoder(FastCoder):
def __repr__(self):
return 'TupleSequenceCoder[%r]' % self._elem_coder
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._elem_coder == self._elem_coder)
+
+ def __hash__(self):
+ return hash((type(self), self._elem_coder))
+
class IterableCoder(FastCoder):
"""Coder of iterables of homogeneous objects."""
@@ -619,6 +682,13 @@ class IterableCoder(FastCoder):
def __repr__(self):
return 'IterableCoder[%r]' % self._elem_coder
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._elem_coder == self._elem_coder)
+
+ def __hash__(self):
+ return hash((type(self), self._elem_coder))
+
class WindowCoder(PickleCoder):
"""Coder for windows in windowed values."""
@@ -663,6 +733,12 @@ class IntervalWindowCoder(FastCoder):
'@type': 'kind:interval_window',
}
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class WindowedValueCoder(FastCoder):
"""Coder for windowed values."""
@@ -709,6 +785,16 @@ class WindowedValueCoder(FastCoder):
def __repr__(self):
return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self.wrapped_value_coder == other.wrapped_value_coder
+ and self.timestamp_coder == other.timestamp_coder
+ and self.window_coder == other.window_coder)
+
+ def __hash__(self):
+ return hash(
+ (self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
+
class LengthPrefixCoder(FastCoder):
"""Coder which prefixes the length of the encoded object in the stream."""
@@ -740,3 +826,10 @@ class LengthPrefixCoder(FastCoder):
def __repr__(self):
return 'LengthPrefixCoder[%r]' % self._value_coder
+
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._value_coder == other._value_coder)
+
+ def __hash__(self):
+ return hash((type(self), self._value_coder))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 9edcf9b..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,14 +52,11 @@ import os
import shutil
import tempfile
-from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.internal import pickler
from apache_beam.runners import create_runner
from apache_beam.runners import PipelineRunner
-from apache_beam.runners.api import beam_runner_api_pb2
-from apache_beam.transforms import core
from apache_beam.transforms import ptransform
from apache_beam.typehints import TypeCheckError
from apache_beam.utils.pipeline_options import PipelineOptions
@@ -443,62 +440,3 @@ class AppliedPTransform(object):
if v not in visited:
visited.add(v)
visitor.visit_value(v, self)
-
-
-class PipelineContextMap(object):
- """This is a bi-directional map between objects and ids.
-
- Under the hood it encodes and decodes these objects into runner API
- representations.
- """
- def __init__(self, context, obj_type, proto_map=None):
- self._pipeline_context = context
- self._obj_type = obj_type
- self._obj_to_id = {}
- self._id_to_obj = {}
- self._id_to_proto = proto_map if proto_map else {}
- self._counter = 0
-
- def _unique_ref(self):
- self._counter += 1
- return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
-
- def populate_map(self, proto_map):
- for id, obj in self._id_to_obj:
- proto_map[id] = self._id_to_proto[id]
-
- def get_id(self, obj):
- if obj not in self._obj_to_id:
- id = self._unique_ref()
- self._id_to_obj[id] = obj
- self._obj_to_id[obj] = id
- self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
- return self._obj_to_id[obj]
-
- def get_by_id(self, id):
- if id not in self._id_to_obj:
- self._id_to_obj[id] = self._obj_type.from_runner_api(
- self._id_to_proto[id], self._pipeline_context)
- return self._id_to_obj[id]
-
-
-class PipelineContext(object):
-
- _COMPONENT_TYPES = {
- 'transforms': AppliedPTransform,
- 'pcollections': pvalue.PCollection,
- 'coders': coders.Coder,
- 'windowing_strategies': core.Windowing,
- # TODO: environment
- }
-
- def __init__(self, context_proto=None):
- for name, cls in self._COMPONENT_TYPES.items():
- setattr(self, name,
- PipelineContextMap(self, cls, getattr(context_proto, name, None)))
-
- def to_runner_api(self):
- context_proto = beam_runner_api_pb2.Components()
- for name, cls in self._COMPONENT_TYPES:
- getattr(self, name).populate_map(getattr(context_proto, name))
- return context_proto
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
new file mode 100644
index 0000000..4f82774
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apache_beam import pipeline
+from apache_beam import pvalue
+from apache_beam import coders
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
+
+
+class _PipelineContextMap(object):
+ """This is a bi-directional map between objects and ids.
+
+ Under the hood it encodes and decodes these objects into runner API
+ representations.
+ """
+ def __init__(self, context, obj_type, proto_map=None):
+ self._pipeline_context = context
+ self._obj_type = obj_type
+ self._obj_to_id = {}
+ self._id_to_obj = {}
+ self._id_to_proto = proto_map if proto_map else {}
+ self._counter = 0
+
+ def _unique_ref(self):
+ self._counter += 1
+ return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+ def populate_map(self, proto_map):
+ for id, proto in self._id_to_proto.items():
+ proto_map[id].CopyFrom(proto)
+
+ def get_id(self, obj):
+ if obj not in self._obj_to_id:
+ id = self._unique_ref()
+ self._id_to_obj[id] = obj
+ self._obj_to_id[obj] = id
+ self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+ return self._obj_to_id[obj]
+
+ def get_by_id(self, id):
+ if id not in self._id_to_obj:
+ self._id_to_obj[id] = self._obj_type.from_runner_api(
+ self._id_to_proto[id], self._pipeline_context)
+ return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+ """Used for accessing and constructing the referenced objects of a Pipeline.
+ """
+
+ _COMPONENT_TYPES = {
+ 'transforms': pipeline.AppliedPTransform,
+ 'pcollections': pvalue.PCollection,
+ 'coders': coders.Coder,
+ 'windowing_strategies': core.Windowing,
+ # TODO: environment
+ }
+
+ def __init__(self, context_proto=None):
+ for name, cls in self._COMPONENT_TYPES.items():
+ setattr(
+ self, name, _PipelineContextMap(
+ self, cls, getattr(context_proto, name, None)))
+
+ @staticmethod
+ def from_runner_api(proto):
+ return PipelineContext(proto)
+
+ def to_runner_api(self):
+ context_proto = beam_runner_api_pb2.Components()
+ for name in self._COMPONENT_TYPES:
+ getattr(self, name).populate_map(getattr(context_proto, name))
+ return context_proto
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py
new file mode 100644
index 0000000..6091ed8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the windowing classes."""
+
+import unittest
+
+from apache_beam import coders
+from apache_beam.runners import pipeline_context
+
+
+class PipelineContextTest(unittest.TestCase):
+
+ def test_deduplication(self):
+ context = pipeline_context.PipelineContext()
+ bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+ bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder())
+ self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
+
+ def test_serialization(self):
+ context = pipeline_context.PipelineContext()
+ float_coder_ref = context.coders.get_id(coders.FloatCoder())
+ bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+ proto = context.to_runner_api()
+ context2 = pipeline_context.PipelineContext.from_runner_api(proto)
+ self.assertEqual(
+ coders.FloatCoder(),
+ context2.coders.get_by_id(float_coder_ref))
+ self.assertEqual(
+ coders.BytesCoder(),
+ context2.coders.get_by_id(bytes_coder_ref))
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 1fc63b2..3251671 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1235,6 +1235,7 @@ class Windowing(object):
trigger=self.triggerfn.to_runner_api(context),
accumulation_mode=self.accumulation_mode,
output_time=self.output_time_fn,
+ # TODO(robertwb): Support EMIT_IF_NONEMPTY
closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
allowed_lateness=0)
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index cc9e0f5..827aa33 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -25,6 +25,7 @@ import unittest
import yaml
import apache_beam as beam
+from apache_beam.runners import pipeline_context
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import trigger
from apache_beam.transforms.core import Windowing
@@ -392,22 +393,7 @@ class RunnerApiTest(unittest.TestCase):
AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
Repeatedly(AfterCount(100)),
trigger.OrFinally(AfterCount(3), AfterCount(10))):
- context = beam.pipeline.PipelineContext()
- self.assertEqual(
- trigger_fn,
- TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
-
- def test_windowing_strategy_encoding(self):
- for trigger_fn in (
- DefaultTrigger(),
- AfterAll(AfterCount(1), AfterCount(10)),
- AfterFirst(AfterCount(10), AfterCount(100)),
- AfterEach(AfterCount(100), AfterCount(1000)),
- AfterWatermark(early=AfterCount(1000)),
- AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
- Repeatedly(AfterCount(100)),
- trigger.OrFinally(AfterCount(3), AfterCount(10))):
- context = beam.pipeline.PipelineContext()
+ context = pipeline_context.PipelineContext()
self.assertEqual(
trigger_fn,
TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index c763a96..3878dff 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -73,6 +73,7 @@ class OutputTimeFn(object):
OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
+ # TODO(robertwb): Add this to the runner API or remove it.
OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
@staticmethod
@@ -167,7 +168,6 @@ class WindowFn(object):
return pickler.loads(fn_parameter.value)
def to_runner_api_parameter(self, context):
- raise TypeError(self)
return (urns.PICKLED_WINDOW_FN,
wrappers_pb2.BytesValue(value=pickler.dumps(self)))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index c79739a..99be02c 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,7 +19,7 @@
import unittest
-from apache_beam import pipeline
+from apache_beam.runners import pipeline_context
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import combiners
@@ -238,7 +238,7 @@ class RunnerApiTest(unittest.TestCase):
FixedWindows(37),
SlidingWindows(2, 389),
Sessions(5077)):
- context = pipeline.PipelineContext()
+ context = pipeline_context.PipelineContext()
self.assertEqual(
window_fn,
WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
@@ -251,7 +251,7 @@ class RunnerApiTest(unittest.TestCase):
Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
accumulation_mode=AccumulationMode.DISCARDING)):
- context = pipeline.PipelineContext()
+ context = pipeline_context.PipelineContext()
self.assertEqual(
windowing,
Windowing.from_runner_api(windowing.to_runner_api(context), context))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 186c99c..936e2cb 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -21,4 +21,4 @@ FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
-PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"
+PICKLED_CODER = "beam:coder:pickled_python:v0.1"
[6/9] beam git commit: Runner API translation of triggers and
windowing strategies.
Posted by ro...@apache.org.
Runner API translation of triggers and windowing strategies.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b86e1fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b86e1fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b86e1fc
Branch: refs/heads/master
Commit: 5b86e1fc22234a7a6dd00696326fa0fae8fe7a2d
Parents: aad32b7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 16:18:02 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 20 +++
sdks/python/apache_beam/pipeline.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 38 +++++
sdks/python/apache_beam/transforms/trigger.py | 143 ++++++++++++++++++-
.../apache_beam/transforms/trigger_test.py | 33 +++++
sdks/python/apache_beam/transforms/window.py | 34 +++--
.../apache_beam/transforms/window_test.py | 23 ++-
7 files changed, 272 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 1d29f32..fd72af8 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -22,6 +22,8 @@ import cPickle as pickle
import google.protobuf
from apache_beam.coders import coder_impl
+from apache_beam.utils import urns
+from apache_beam.utils import proto_utils
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
@@ -182,6 +184,24 @@ class Coder(object):
and self._dict_without_impl() == other._dict_without_impl())
# pylint: enable=protected-access
+ def to_runner_api(self, context):
+ # TODO(BEAM-115): Use specialized URNs and components.
+ from apache_beam.runners.api import beam_runner_api_pb2
+ return beam_runner_api_pb2.Coder(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ spec=beam_runner_api_pb2.UrnWithParameter(
+ urn=urns.PICKLED_CODER,
+ parameter=proto_utils.pack_Any(
+ google.protobuf.wrappers_pb2.BytesValue(
+ value=serialize_coder(self))))))
+
+ @staticmethod
+ def from_runner_api(proto, context):
+ any_proto = proto.spec.spec.parameter
+ bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
+ any_proto.Unpack(bytes_proto)
+ return deserialize_coder(bytes_proto.value)
+
class StrUtf8Coder(Coder):
"""A coder used for reading and writing strings as UTF-8."""
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 4ec2e47..9edcf9b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -499,6 +499,6 @@ class PipelineContext(object):
def to_runner_api(self):
context_proto = beam_runner_api_pb2.Components()
- for name, cls in self._COMPONENT_TYEPS:
+ for name, cls in self._COMPONENT_TYPES:
getattr(self, name).populate_map(getattr(context_proto, name))
return context_proto
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 7abd784..1fc63b2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -27,6 +27,7 @@ from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.coders import typecoders
from apache_beam.internal import util
+from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
@@ -49,6 +50,7 @@ from apache_beam.typehints import WithTypeHints
from apache_beam.typehints.trivial_inference import element_type
from apache_beam.utils.pipeline_options import TypeOptions
+
# Type variables
T = typehints.TypeVariable('T')
K = typehints.TypeVariable('K')
@@ -1207,9 +1209,45 @@ class Windowing(object):
self.accumulation_mode,
self.output_time_fn)
+ def __eq__(self, other):
+ if type(self) == type(other):
+ if self._is_default and other._is_default:
+ return True
+ else:
+ return (
+ self.windowfn == other.windowfn
+ and self.triggerfn == other.triggerfn
+ and self.accumulation_mode == other.accumulation_mode
+ and self.output_time_fn == other.output_time_fn)
+
def is_default(self):
return self._is_default
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.WindowingStrategy(
+ window_fn=self.windowfn.to_runner_api(context),
+ # TODO(robertwb): Prohibit implicit multi-level merging.
+ merge_status=(beam_runner_api_pb2.NEEDS_MERGE
+ if self.windowfn.is_merging()
+ else beam_runner_api_pb2.NON_MERGING),
+ window_coder_id=context.coders.get_id(
+ self.windowfn.get_window_coder()),
+ trigger=self.triggerfn.to_runner_api(context),
+ accumulation_mode=self.accumulation_mode,
+ output_time=self.output_time_fn,
+ closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
+ allowed_lateness=0)
+
+ @staticmethod
+ def from_runner_api(proto, context):
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam.transforms.trigger import TriggerFn
+ return Windowing(
+ windowfn=WindowFn.from_runner_api(proto.window_fn, context),
+ triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
+ accumulation_mode=proto.accumulation_mode,
+ output_time_fn=proto.output_time)
+
@typehints.with_input_types(T)
@typehints.with_output_types(T)
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 04198ba..b55d602 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -35,13 +35,14 @@ from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import OutputTimeFn
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
+from apache_beam.runners.api import beam_runner_api_pb2
class AccumulationMode(object):
"""Controls what to do with data when a trigger fires multiple times.
"""
- DISCARDING = 1
- ACCUMULATING = 2
+ DISCARDING = beam_runner_api_pb2.DISCARDING
+ ACCUMULATING = beam_runner_api_pb2.ACCUMULATING
# TODO(robertwb): Provide retractions of previous outputs.
# RETRACTING = 3
@@ -185,6 +186,26 @@ class TriggerFn(object):
pass
# pylint: enable=unused-argument
+ @staticmethod
+ def from_runner_api(proto, context):
+ return {
+ 'after_all': AfterAll,
+ 'after_any': AfterFirst,
+ 'after_each': AfterEach,
+ 'after_end_of_widow': AfterWatermark,
+ # after_processing_time, after_synchronized_processing_time
+ # always
+ 'default': DefaultTrigger,
+ 'element_count': AfterCount,
+ # never
+ 'or_finally': OrFinally,
+ 'repeat': Repeatedly,
+ }[proto.WhichOneof('trigger')].from_runner_api(proto, context)
+
+ @abstractmethod
+ def to_runner_api(self, unused_context):
+ pass
+
class DefaultTrigger(TriggerFn):
"""Semantically Repeatedly(AfterWatermark()), but more optimized."""
@@ -216,6 +237,14 @@ class DefaultTrigger(TriggerFn):
def __eq__(self, other):
return type(self) == type(other)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return DefaultTrigger()
+
+ def to_runner_api(self, unused_context):
+ return beam_runner_api_pb2.Trigger(
+ default=beam_runner_api_pb2.Trigger.Default())
+
class AfterWatermark(TriggerFn):
"""Fire exactly once when the watermark passes the end of the window.
@@ -235,9 +264,9 @@ class AfterWatermark(TriggerFn):
def __repr__(self):
qualifiers = []
if self.early:
- qualifiers.append('early=%s' % self.early)
+ qualifiers.append('early=%s' % self.early.underlying)
if self.late:
- qualifiers.append('late=%s' % self.late)
+ qualifiers.append('late=%s' % self.late.underlying)
return 'AfterWatermark(%s)' % ', '.join(qualifiers)
def is_late(self, context):
@@ -305,6 +334,28 @@ class AfterWatermark(TriggerFn):
def __hash__(self):
return hash((type(self), self.early, self.late))
+ @staticmethod
+ def from_runner_api(proto, context):
+ return AfterWatermark(
+ early=TriggerFn.from_runner_api(
+ proto.after_end_of_widow.early_firings, context)
+ if proto.after_end_of_widow.HasField('early_firings')
+ else None,
+ late=TriggerFn.from_runner_api(
+ proto.after_end_of_widow.late_firings, context)
+ if proto.after_end_of_widow.HasField('late_firings')
+ else None)
+
+ def to_runner_api(self, context):
+ early_proto = self.early.underlying.to_runner_api(
+ context) if self.early else None
+ late_proto = self.late.underlying.to_runner_api(
+ context) if self.late else None
+ return beam_runner_api_pb2.Trigger(
+ after_end_of_widow=beam_runner_api_pb2.Trigger.AfterEndOfWindow(
+ early_firings=early_proto,
+ late_firings=late_proto))
+
class AfterCount(TriggerFn):
"""Fire when there are at least count elements in this window pane."""
@@ -317,6 +368,9 @@ class AfterCount(TriggerFn):
def __repr__(self):
return 'AfterCount(%s)' % self.count
+ def __eq__(self, other):
+ return type(self) == type(other) and self.count == other.count
+
def on_element(self, element, window, context):
context.add_state(self.COUNT_TAG, 1)
@@ -333,6 +387,15 @@ class AfterCount(TriggerFn):
def reset(self, window, context):
context.clear_state(self.COUNT_TAG)
+ @staticmethod
+ def from_runner_api(proto, unused_context):
+ return AfterCount(proto.element_count.element_count)
+
+ def to_runner_api(self, unused_context):
+ return beam_runner_api_pb2.Trigger(
+ element_count=beam_runner_api_pb2.Trigger.ElementCount(
+ element_count=self.count))
+
class Repeatedly(TriggerFn):
"""Repeatedly invoke the given trigger, never finishing."""
@@ -343,6 +406,9 @@ class Repeatedly(TriggerFn):
def __repr__(self):
return 'Repeatedly(%s)' % self.underlying
+ def __eq__(self, other):
+ return type(self) == type(other) and self.underlying == other.underlying
+
def on_element(self, element, window, context): # get window from context?
self.underlying.on_element(element, window, context)
@@ -360,6 +426,16 @@ class Repeatedly(TriggerFn):
def reset(self, window, context):
self.underlying.reset(window, context)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return Repeatedly(
+ TriggerFn.from_runner_api(proto.repeat.subtrigger, context))
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.Trigger(
+ repeat=beam_runner_api_pb2.Trigger.Repeat(
+ subtrigger=self.underlying.to_runner_api(context)))
+
class ParallelTriggerFn(TriggerFn):
@@ -372,6 +448,9 @@ class ParallelTriggerFn(TriggerFn):
return '%s(%s)' % (self.__class__.__name__,
', '.join(str(t) for t in self.triggers))
+ def __eq__(self, other):
+ return type(self) == type(other) and self.triggers == other.triggers
+
@abstractmethod
def combine_op(self, trigger_results):
pass
@@ -406,6 +485,31 @@ class ParallelTriggerFn(TriggerFn):
def _sub_context(context, index):
return NestedContext(context, '%d/' % index)
+ @staticmethod
+ def from_runner_api(proto, context):
+ subtriggers = [
+ TriggerFn.from_runner_api(subtrigger, context)
+ for subtrigger
+ in proto.after_all.subtriggers or proto.after_any.subtriggers]
+ if proto.after_all.subtriggers:
+ return AfterAll(*subtriggers)
+ else:
+ return AfterFirst(*subtriggers)
+
+ def to_runner_api(self, context):
+ subtriggers = [
+ subtrigger.to_runner_api(context) for subtrigger in self.triggers]
+ if self.combine_op == all:
+ return beam_runner_api_pb2.Trigger(
+ after_all=beam_runner_api_pb2.Trigger.AfterAll(
+ subtriggers=subtriggers))
+ elif self.combine_op == any:
+ return beam_runner_api_pb2.Trigger(
+ after_any=beam_runner_api_pb2.Trigger.AfterAny(
+ subtriggers=subtriggers))
+ else:
+ raise NotImplementedError(self)
+
class AfterFirst(ParallelTriggerFn):
"""Fires when any subtrigger fires.
@@ -435,6 +539,9 @@ class AfterEach(TriggerFn):
return '%s(%s)' % (self.__class__.__name__,
', '.join(str(t) for t in self.triggers))
+ def __eq__(self, other):
+ return type(self) == type(other) and self.triggers == other.triggers
+
def on_element(self, element, window, context):
ix = context.get_state(self.INDEX_TAG)
if ix < len(self.triggers):
@@ -474,12 +581,40 @@ class AfterEach(TriggerFn):
def _sub_context(context, index):
return NestedContext(context, '%d/' % index)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return AfterEach(*[
+ TriggerFn.from_runner_api(subtrigger, context)
+ for subtrigger in proto.after_each.subtriggers])
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.Trigger(
+ after_each=beam_runner_api_pb2.Trigger.AfterEach(
+ subtriggers=[
+ subtrigger.to_runner_api(context)
+ for subtrigger in self.triggers]))
+
class OrFinally(AfterFirst):
def __init__(self, body_trigger, exit_trigger):
super(OrFinally, self).__init__(body_trigger, exit_trigger)
+ @staticmethod
+ def from_runner_api(proto, context):
+ return OrFinally(
+ TriggerFn.from_runner_api(proto.or_finally.main, context),
+ # getattr is used as finally is a keyword in Python
+ TriggerFn.from_runner_api(getattr(proto.or_finally, 'finally'),
+ context))
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.Trigger(
+ or_finally=beam_runner_api_pb2.Trigger.OrFinally(
+ main=self.triggers[0].to_runner_api(context),
+ # dict keyword argument is used as finally is a keyword in Python
+ **{'finally': self.triggers[1].to_runner_api(context)}))
+
class TriggerContext(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 72bab2e..cc9e0f5 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -38,6 +38,7 @@ from apache_beam.transforms.trigger import DefaultTrigger
from apache_beam.transforms.trigger import GeneralTriggerDriver
from apache_beam.transforms.trigger import InMemoryUnmergedState
from apache_beam.transforms.trigger import Repeatedly
+from apache_beam.transforms.trigger import TriggerFn
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import IntervalWindow
@@ -380,6 +381,38 @@ class TriggerTest(unittest.TestCase):
range(10))
+class RunnerApiTest(unittest.TestCase):
+
+ def test_trigger_encoding(self):
+ for trigger_fn in (
+ DefaultTrigger(),
+ AfterAll(AfterCount(1), AfterCount(10)),
+ AfterFirst(AfterCount(10), AfterCount(100)),
+ AfterWatermark(early=AfterCount(1000)),
+ AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+ Repeatedly(AfterCount(100)),
+ trigger.OrFinally(AfterCount(3), AfterCount(10))):
+ context = beam.pipeline.PipelineContext()
+ self.assertEqual(
+ trigger_fn,
+ TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+ def test_windowing_strategy_encoding(self):
+ for trigger_fn in (
+ DefaultTrigger(),
+ AfterAll(AfterCount(1), AfterCount(10)),
+ AfterFirst(AfterCount(10), AfterCount(100)),
+ AfterEach(AfterCount(100), AfterCount(1000)),
+ AfterWatermark(early=AfterCount(1000)),
+ AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
+ Repeatedly(AfterCount(100)),
+ trigger.OrFinally(AfterCount(3), AfterCount(10))):
+ context = beam.pipeline.PipelineContext()
+ self.assertEqual(
+ trigger_fn,
+ TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
+
+
class TriggerPipelineTest(unittest.TestCase):
def test_after_count(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a562bcf..c763a96 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -70,9 +70,9 @@ from apache_beam.utils import urns
class OutputTimeFn(object):
"""Determines how output timestamps of grouping operations are assigned."""
- OUTPUT_AT_EOW = 'OUTPUT_AT_EOW'
- OUTPUT_AT_EARLIEST = 'OUTPUT_AT_EARLIEST'
- OUTPUT_AT_LATEST = 'OUTPUT_AT_LATEST'
+ OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
+ OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
+ OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
@staticmethod
@@ -116,6 +116,10 @@ class WindowFn(object):
"""Returns a window that is the result of merging a set of windows."""
raise NotImplementedError
+ def is_merging(self):
+ """Returns whether this WindowFn merges windows."""
+ return True
+
def get_window_coder(self):
return coders.WindowCoder()
@@ -267,7 +271,16 @@ class GlobalWindow(BoundedWindow):
return self is other or type(self) is type(other)
-class GlobalWindows(WindowFn):
+class NonMergingWindowFn(WindowFn):
+
+ def is_merging(self):
+ return False
+
+ def merge(self, merge_context):
+ pass # No merging.
+
+
+class GlobalWindows(NonMergingWindowFn):
"""A windowing function that assigns everything to one global window."""
@classmethod
@@ -277,9 +290,6 @@ class GlobalWindows(WindowFn):
def assign(self, assign_context):
return [GlobalWindow()]
- def merge(self, merge_context):
- pass # No merging.
-
def get_window_coder(self):
return coders.GlobalWindowCoder()
@@ -304,7 +314,7 @@ WindowFn.register_urn(
urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
-class FixedWindows(WindowFn):
+class FixedWindows(NonMergingWindowFn):
"""A windowing function that assigns each element to one time interval.
The attributes size and offset determine in what time interval a timestamp
@@ -329,9 +339,6 @@ class FixedWindows(WindowFn):
start = timestamp - (timestamp - self.offset) % self.size
return [IntervalWindow(start, start + self.size)]
- def merge(self, merge_context):
- pass # No merging.
-
def __eq__(self, other):
if type(self) == type(other) == FixedWindows:
return self.size == other.size and self.offset == other.offset
@@ -356,7 +363,7 @@ WindowFn.register_urn(
FixedWindows.from_runner_api_parameter)
-class SlidingWindows(WindowFn):
+class SlidingWindows(NonMergingWindowFn):
"""A windowing function that assigns each element to a set of sliding windows.
The attributes size and offset determine in what time interval a timestamp
@@ -384,9 +391,6 @@ class SlidingWindows(WindowFn):
return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
for s in range(start, start - self.size, -self.period)]
- def merge(self, merge_context):
- pass # No merging.
-
def __eq__(self, other):
if type(self) == type(other) == SlidingWindows:
return (self.size == other.size
http://git-wip-us.apache.org/repos/asf/beam/blob/5b86e1fc/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 821b143..c79739a 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -28,13 +28,17 @@ from apache_beam.transforms import Create
from apache_beam.transforms import GroupByKey
from apache_beam.transforms import Map
from apache_beam.transforms import WindowInto
+from apache_beam.transforms.core import Windowing
from apache_beam.transforms.timeutil import MAX_TIMESTAMP
from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms.window import OutputTimeFn
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
@@ -226,7 +230,10 @@ class WindowTest(unittest.TestCase):
label='assert:mean')
p.run()
- def test_runner_api(self):
+
+class RunnerApiTest(unittest.TestCase):
+
+ def test_windowfn_encoding(self):
for window_fn in (GlobalWindows(),
FixedWindows(37),
SlidingWindows(2, 389),
@@ -236,5 +243,19 @@ class WindowTest(unittest.TestCase):
window_fn,
WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
+ def test_windowing_encoding(self):
+ for windowing in (
+ Windowing(GlobalWindows()),
+ Windowing(FixedWindows(1, 3), AfterCount(6),
+ accumulation_mode=AccumulationMode.ACCUMULATING),
+ Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
+ output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
+ accumulation_mode=AccumulationMode.DISCARDING)):
+ context = pipeline.PipelineContext()
+ self.assertEqual(
+ windowing,
+ Windowing.from_runner_api(windowing.to_runner_api(context), context))
+
+
if __name__ == '__main__':
unittest.main()
[4/9] beam git commit: Auto-generated runner api proto bindings.
Posted by ro...@apache.org.
Auto-generated runner api proto bindings.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bb125e1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bb125e1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bb125e1
Branch: refs/heads/master
Commit: 3bb125e12d625216c234fe396168843e6669c1e5
Parents: f13a84d
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 12:02:08 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:00 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/runners/api/__init__.py | 0
.../runners/api/beam_runner_api_pb2.py | 2755 ++++++++++++++++++
sdks/python/run_pylint.sh | 3 +-
3 files changed, 2757 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3bb125e1/sdks/python/apache_beam/runners/api/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/__init__.py b/sdks/python/apache_beam/runners/api/__init__.py
new file mode 100644
index 0000000..e69de29
[9/9] beam git commit: Closes #2190
Posted by ro...@apache.org.
Closes #2190
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c2424cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c2424cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c2424cb
Branch: refs/heads/master
Commit: 2c2424cb44bb2976ea9099230106a639b5ee3993
Parents: f13a84d deff128
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 9 20:29:03 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:03 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 113 +
sdks/python/apache_beam/runners/api/__init__.py | 16 +
.../runners/api/beam_runner_api_pb2.py | 2772 ++++++++++++++++++
.../apache_beam/runners/pipeline_context.py | 88 +
.../runners/pipeline_context_test.py | 49 +
sdks/python/apache_beam/transforms/core.py | 39 +
sdks/python/apache_beam/transforms/trigger.py | 143 +-
.../apache_beam/transforms/trigger_test.py | 19 +
sdks/python/apache_beam/transforms/window.py | 147 +-
.../apache_beam/transforms/window_test.py | 32 +
sdks/python/apache_beam/utils/proto_utils.py | 54 +
sdks/python/apache_beam/utils/urns.py | 24 +
sdks/python/run_pylint.sh | 3 +-
13 files changed, 3481 insertions(+), 18 deletions(-)
----------------------------------------------------------------------