You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/20 15:55:20 UTC
[2/6] beam git commit: Require deterministic window coders.
Require deterministic window coders.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d98294c2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d98294c2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d98294c2
Branch: refs/heads/master
Commit: d98294c2bd13b45522ea584485bd62e900144c88
Parents: 72f5020
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:49:13 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:03 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 16 ----------------
.../apache_beam/coders/coders_test_common.py | 1 -
sdks/python/apache_beam/transforms/core.py | 4 ++++
sdks/python/apache_beam/transforms/window.py | 18 +++++++++++++++++-
4 files changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 8ef0a46..4f75182 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -688,22 +688,6 @@ class IterableCoder(FastCoder):
return hash((type(self), self._elem_coder))
-class WindowCoder(PickleCoder):
- """Coder for windows in windowed values."""
-
- def _create_impl(self):
- return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
-
- def is_deterministic(self):
- # Note that WindowCoder as implemented is not deterministic because the
- # implementation simply pickles windows. See the corresponding comments
- # on PickleCoder for more details.
- return False
-
- def as_cloud_object(self):
- return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
-
-
class GlobalWindowCoder(SingletonCoder):
"""Coder for global windows."""
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 6491ea8..da0bde3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -62,7 +62,6 @@ class CodersTest(unittest.TestCase):
coders.FastCoder,
coders.ProtoCoder,
coders.ToStringCoder,
- coders.WindowCoder,
coders.IntervalWindowCoder])
assert not standard - cls.seen, standard - cls.seen
assert not standard - cls.seen_nested, standard - cls.seen_nested
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 2d28eec..9f66c39 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1185,6 +1185,10 @@ class Windowing(object):
else:
raise ValueError(
'accumulation_mode must be provided for non-trivial triggers')
+ if not windowfn.get_window_coder().is_deterministic():
+ raise ValueError(
+ 'window fn (%s) does not have a determanistic coder (%s)' % (
+ window_fn, windowfn.get_window_coder()))
self.windowfn = windowfn
self.triggerfn = triggerfn
self.accumulation_mode = accumulation_mode
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 931a17d..643cb99 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,6 +49,8 @@ WindowFn.
from __future__ import absolute_import
+import abc
+
from google.protobuf import struct_pb2
from google.protobuf import wrappers_pb2
@@ -93,6 +95,8 @@ class OutputTimeFn(object):
class WindowFn(object):
"""An abstract windowing function defining a basic assign and merge."""
+ __metaclass__ = abc.ABCMeta
+
class AssignContext(object):
"""Context passed to WindowFn.assign()."""
@@ -100,6 +104,7 @@ class WindowFn(object):
self.timestamp = Timestamp.of(timestamp)
self.element = element
+ @abc.abstractmethod
def assign(self, assign_context):
"""Associates a timestamp to an element."""
raise NotImplementedError
@@ -113,6 +118,7 @@ class WindowFn(object):
def merge(self, to_be_merged, merge_result):
raise NotImplementedError
+ @abc.abstractmethod
def merge(self, merge_context):
"""Returns a window that is the result of merging a set of windows."""
raise NotImplementedError
@@ -121,8 +127,9 @@ class WindowFn(object):
"""Returns whether this WindowFn merges windows."""
return True
+ @abc.abstractmethod
def get_window_coder(self):
- return coders.WindowCoder()
+ raise NotImplementedError
def get_transformed_output_time(self, window, input_timestamp): # pylint: disable=unused-argument
"""Given input time and output window, returns output time for window.
@@ -344,6 +351,9 @@ class FixedWindows(NonMergingWindowFn):
start = timestamp - (timestamp - self.offset) % self.size
return [IntervalWindow(start, start + self.size)]
+ def get_window_coder(self):
+ return coders.IntervalWindowCoder()
+
def __eq__(self, other):
if type(self) == type(other) == FixedWindows:
return self.size == other.size and self.offset == other.offset
@@ -398,6 +408,9 @@ class SlidingWindows(NonMergingWindowFn):
for s in range(start.micros, timestamp.micros - self.size.micros,
-self.period.micros)]
+ def get_window_coder(self):
+ return coders.IntervalWindowCoder()
+
def __eq__(self, other):
if type(self) == type(other) == SlidingWindows:
return (self.size == other.size
@@ -443,6 +456,9 @@ class Sessions(WindowFn):
timestamp = context.timestamp
return [IntervalWindow(timestamp, timestamp + self.gap_size)]
+ def get_window_coder(self):
+ return coders.IntervalWindowCoder()
+
def merge(self, merge_context):
to_merge = []
end = timeutil.MIN_TIMESTAMP