You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/20 15:55:20 UTC

[2/6] beam git commit: Require deterministic window coders.

Require deterministic window coders.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d98294c2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d98294c2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d98294c2

Branch: refs/heads/master
Commit: d98294c2bd13b45522ea584485bd62e900144c88
Parents: 72f5020
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:49:13 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:03 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py          | 16 ----------------
 .../apache_beam/coders/coders_test_common.py      |  1 -
 sdks/python/apache_beam/transforms/core.py        |  4 ++++
 sdks/python/apache_beam/transforms/window.py      | 18 +++++++++++++++++-
 4 files changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 8ef0a46..4f75182 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -688,22 +688,6 @@ class IterableCoder(FastCoder):
     return hash((type(self), self._elem_coder))
 
 
-class WindowCoder(PickleCoder):
-  """Coder for windows in windowed values."""
-
-  def _create_impl(self):
-    return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
-
-  def is_deterministic(self):
-    # Note that WindowCoder as implemented is not deterministic because the
-    # implementation simply pickles windows.  See the corresponding comments
-    # on PickleCoder for more details.
-    return False
-
-  def as_cloud_object(self):
-    return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
-
-
 class GlobalWindowCoder(SingletonCoder):
   """Coder for global windows."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 6491ea8..da0bde3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -62,7 +62,6 @@ class CodersTest(unittest.TestCase):
                      coders.FastCoder,
                      coders.ProtoCoder,
                      coders.ToStringCoder,
-                     coders.WindowCoder,
                      coders.IntervalWindowCoder])
     assert not standard - cls.seen, standard - cls.seen
     assert not standard - cls.seen_nested, standard - cls.seen_nested

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 2d28eec..9f66c39 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1185,6 +1185,10 @@ class Windowing(object):
       else:
         raise ValueError(
             'accumulation_mode must be provided for non-trivial triggers')
+    if not windowfn.get_window_coder().is_deterministic():
+      raise ValueError(
+          'window fn (%s) does not have a determanistic coder (%s)' % (
+              window_fn, windowfn.get_window_coder()))
     self.windowfn = windowfn
     self.triggerfn = triggerfn
     self.accumulation_mode = accumulation_mode

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 931a17d..643cb99 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,6 +49,8 @@ WindowFn.
 
 from __future__ import absolute_import
 
+import abc
+
 from google.protobuf import struct_pb2
 from google.protobuf import wrappers_pb2
 
@@ -93,6 +95,8 @@ class OutputTimeFn(object):
 class WindowFn(object):
   """An abstract windowing function defining a basic assign and merge."""
 
+  __metaclass__ = abc.ABCMeta
+
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""
 
@@ -100,6 +104,7 @@ class WindowFn(object):
       self.timestamp = Timestamp.of(timestamp)
       self.element = element
 
+  @abc.abstractmethod
   def assign(self, assign_context):
     """Associates a timestamp to an element."""
     raise NotImplementedError
@@ -113,6 +118,7 @@ class WindowFn(object):
     def merge(self, to_be_merged, merge_result):
       raise NotImplementedError
 
+  @abc.abstractmethod
   def merge(self, merge_context):
     """Returns a window that is the result of merging a set of windows."""
     raise NotImplementedError
@@ -121,8 +127,9 @@ class WindowFn(object):
     """Returns whether this WindowFn merges windows."""
     return True
 
+  @abc.abstractmethod
   def get_window_coder(self):
-    return coders.WindowCoder()
+    raise NotImplementedError
 
   def get_transformed_output_time(self, window, input_timestamp):  # pylint: disable=unused-argument
     """Given input time and output window, returns output time for window.
@@ -344,6 +351,9 @@ class FixedWindows(NonMergingWindowFn):
     start = timestamp - (timestamp - self.offset) % self.size
     return [IntervalWindow(start, start + self.size)]
 
+  def get_window_coder(self):
+    return coders.IntervalWindowCoder()
+
   def __eq__(self, other):
     if type(self) == type(other) == FixedWindows:
       return self.size == other.size and self.offset == other.offset
@@ -398,6 +408,9 @@ class SlidingWindows(NonMergingWindowFn):
         for s in range(start.micros, timestamp.micros - self.size.micros,
                        -self.period.micros)]
 
+  def get_window_coder(self):
+    return coders.IntervalWindowCoder()
+
   def __eq__(self, other):
     if type(self) == type(other) == SlidingWindows:
       return (self.size == other.size
@@ -443,6 +456,9 @@ class Sessions(WindowFn):
     timestamp = context.timestamp
     return [IntervalWindow(timestamp, timestamp + self.gap_size)]
 
+  def get_window_coder(self):
+    return coders.IntervalWindowCoder()
+
   def merge(self, merge_context):
     to_merge = []
     end = timeutil.MIN_TIMESTAMP