You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/10 04:29:39 UTC
[7/9] beam git commit: Runner API encoding of WindowFns.
Runner API encoding of WindowFns.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aad32b7a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aad32b7a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aad32b7a
Branch: refs/heads/master
Commit: aad32b7a00d1aea1e7e51b68ff609d2fb3b82a8f
Parents: bc76a18
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Mar 7 12:21:02 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:01 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/window.py | 117 +++++++++++++++++++
.../apache_beam/transforms/window_test.py | 11 ++
sdks/python/apache_beam/utils/proto_utils.py | 37 ++++++
sdks/python/apache_beam/utils/urns.py | 7 ++
4 files changed, 172 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 14cf2f6..a562bcf 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,13 +49,20 @@ WindowFn.
from __future__ import absolute_import
+from google.protobuf import struct_pb2
+from google.protobuf import wrappers_pb2
+
from apache_beam import coders
+from apache_beam.internal import pickler
+from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms import timeutil
from apache_beam.transforms.timeutil import Duration
from apache_beam.transforms.timeutil import MAX_TIMESTAMP
from apache_beam.transforms.timeutil import MIN_TIMESTAMP
from apache_beam.transforms.timeutil import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
# TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
@@ -131,6 +138,41 @@ class WindowFn(object):
# By default, just return the input timestamp.
return input_timestamp
+ _known_urns = {}
+
+ @classmethod
+ def register_urn(cls, urn, parameter_type, constructor):
+ cls._known_urns[urn] = parameter_type, constructor
+
+ @classmethod
+ def from_runner_api(cls, fn_proto, context):
+ parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+ return constructor(
+ proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+ context)
+
+ def to_runner_api(self, context):
+ urn, typed_param = self.to_runner_api_parameter(context)
+ return beam_runner_api_pb2.FunctionSpec(
+ spec=beam_runner_api_pb2.UrnWithParameter(
+ urn=urn,
+ parameter=proto_utils.pack_Any(typed_param)))
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return pickler.loads(fn_parameter.value)
+
+ def to_runner_api_parameter(self, context):
+ raise TypeError(self)
+ return (urns.PICKLED_WINDOW_FN,
+ wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+
+WindowFn.register_urn(
+ urns.PICKLED_WINDOW_FN,
+ wrappers_pb2.BytesValue,
+ WindowFn.from_runner_api_parameter)
+
class BoundedWindow(object):
"""A window for timestamps in range (-infinity, end).
@@ -251,6 +293,16 @@ class GlobalWindows(WindowFn):
def __ne__(self, other):
return not self == other
+ @staticmethod
+ def from_runner_api_parameter(unused_fn_parameter, unused_context):
+ return GlobalWindows()
+
+ def to_runner_api_parameter(self, context):
+ return urns.GLOBAL_WINDOWS_FN, None
+
+WindowFn.register_urn(
+ urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+
class FixedWindows(WindowFn):
"""A windowing function that assigns each element to one time interval.
@@ -280,6 +332,29 @@ class FixedWindows(WindowFn):
def merge(self, merge_context):
pass # No merging.
+ def __eq__(self, other):
+ if type(self) == type(other) == FixedWindows:
+ return self.size == other.size and self.offset == other.offset
+
+ def __ne__(self, other):
+ return not self == other
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return FixedWindows(
+ size=Duration(micros=fn_parameter['size']),
+ offset=Timestamp(micros=fn_parameter['offset']))
+
+ def to_runner_api_parameter(self, context):
+ return (urns.FIXED_WINDOWS_FN,
+ proto_utils.pack_Struct(size=self.size.micros,
+ offset=self.offset.micros))
+
+WindowFn.register_urn(
+ urns.FIXED_WINDOWS_FN,
+ struct_pb2.Struct,
+ FixedWindows.from_runner_api_parameter)
+
class SlidingWindows(WindowFn):
"""A windowing function that assigns each element to a set of sliding windows.
@@ -312,6 +387,31 @@ class SlidingWindows(WindowFn):
def merge(self, merge_context):
pass # No merging.
+ def __eq__(self, other):
+ if type(self) == type(other) == SlidingWindows:
+ return (self.size == other.size
+ and self.offset == other.offset
+ and self.period == other.period)
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return SlidingWindows(
+ size=Duration(micros=fn_parameter['size']),
+ offset=Timestamp(micros=fn_parameter['offset']),
+ period=Duration(micros=fn_parameter['period']))
+
+ def to_runner_api_parameter(self, context):
+ return (urns.SLIDING_WINDOWS_FN,
+ proto_utils.pack_Struct(
+ size=self.size.micros,
+ offset=self.offset.micros,
+ period=self.period.micros))
+
+WindowFn.register_urn(
+ urns.SLIDING_WINDOWS_FN,
+ struct_pb2.Struct,
+ SlidingWindows.from_runner_api_parameter)
+
class Sessions(WindowFn):
"""A windowing function that groups elements into sessions.
@@ -352,3 +452,20 @@ class Sessions(WindowFn):
end = w.end
if len(to_merge) > 1:
merge_context.merge(to_merge, IntervalWindow(to_merge[0].start, end))
+
+ def __eq__(self, other):
+ if type(self) == type(other) == Sessions:
+ return self.gap_size == other.gap_size
+
+ @staticmethod
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
+
+ def to_runner_api_parameter(self, context):
+ return (urns.SESSION_WINDOWS_FN,
+ proto_utils.pack_Struct(gap_size=self.gap_size.micros))
+
+WindowFn.register_urn(
+ urns.SESSION_WINDOWS_FN,
+ struct_pb2.Struct,
+ Sessions.from_runner_api_parameter)
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index c4072ac..821b143 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,6 +19,7 @@
import unittest
+from apache_beam import pipeline
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import combiners
@@ -32,6 +33,7 @@ from apache_beam.transforms.timeutil import MIN_TIMESTAMP
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
@@ -224,6 +226,15 @@ class WindowTest(unittest.TestCase):
label='assert:mean')
p.run()
+ def test_runner_api(self):
+ for window_fn in (GlobalWindows(),
+ FixedWindows(37),
+ SlidingWindows(2, 389),
+ Sessions(5077)):
+ context = pipeline.PipelineContext()
+ self.assertEqual(
+ window_fn,
+ WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
new file mode 100644
index 0000000..0ece8f5
--- /dev/null
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -0,0 +1,37 @@
+from google.protobuf import any_pb2
+from google.protobuf import struct_pb2
+
+
+def pack_Any(msg):
+ """Creates a protobuf Any with msg as its content.
+
+ Returns None if msg is None.
+ """
+ if msg is None:
+ return None
+ else:
+ result = any_pb2.Any()
+ result.Pack(msg)
+ return result
+
+
+def unpack_Any(any_msg, msg_class):
+ """Unpacks any_msg into msg_class.
+
+ Returns None if msg_class is None.
+ """
+ if msg_class is None:
+ return None
+ else:
+ msg = msg_class()
+ any_msg.Unpack(msg)
+ return msg
+
+
+def pack_Struct(**kwargs):
+ """Returns a struct containing the values indicated by kwargs.
+ """
+ msg = struct_pb2.Struct()
+ for key, value in kwargs.items():
+ msg[key] = value # pylint: disable=unsubscriptable-object
+ return msg
http://git-wip-us.apache.org/repos/asf/beam/blob/aad32b7a/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
new file mode 100644
index 0000000..4d1c2f7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -0,0 +1,7 @@
+PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
+GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
+FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
+SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
+SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
+
+PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"