You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/01/18 22:49:13 UTC

[beam] branch master updated: Allow more efficient encoding of generic iterable types.

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cd77c1e  Allow more efficient encoding of generic iterable types.
     new 897ff50  Merge pull request #7427 More efficient generic iterables
cd77c1e is described below

commit cd77c1e244aeb765fb43ece5637f39f4e6049dfd
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Mon Jan 7 14:59:42 2019 +0100

    Allow more efficient encoding of generic iterable types.
---
 sdks/python/apache_beam/coders/coder_impl.pxd      |  5 +-
 sdks/python/apache_beam/coders/coder_impl.py       | 64 +++++++++++++++-------
 .../apache_beam/runners/worker/bundle_processor.py |  7 +++
 3 files changed, 54 insertions(+), 22 deletions(-)

diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index 9d5ac80..c5ce4e8 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -74,10 +74,13 @@ cdef class DeterministicFastPrimitivesCoderImpl(CoderImpl):
 cdef object NoneType
 cdef unsigned char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE, BOOL_TYPE
 cdef unsigned char BYTES_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE
-cdef unsigned char SET_TYPE
+cdef unsigned char SET_TYPE, ITERABLE_LIKE_TYPE
+
+cdef set _ITERABLE_LIKE_TYPES
 
 cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
   cdef CoderImpl fallback_coder_impl
+  cdef CoderImpl iterable_coder_impl
   @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
                  unicode_value=unicode)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 244b82c..d419038 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -282,6 +282,14 @@ LIST_TYPE = 5
 TUPLE_TYPE = 6
 DICT_TYPE = 7
 SET_TYPE = 8
+ITERABLE_LIKE_TYPE = 10
+
+
+# Types that can be encoded as iterables, but are not literally
+# lists, etc. due to being lazy.  The actual type is not preserved
+# through encoding, only the elements. This is particularly useful
+# for the value list types created in GroupByKey.
+_ITERABLE_LIKE_TYPES = set()
 
 
 class FastPrimitivesCoderImpl(StreamCoderImpl):
@@ -289,6 +297,11 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
 
   def __init__(self, fallback_coder_impl):
     self.fallback_coder_impl = fallback_coder_impl
+    self.iterable_coder_impl = IterableCoderImpl(self)
+
+  @staticmethod
+  def register_iterable_like_type(t):
+    _ITERABLE_LIKE_TYPES.add(t)
 
   def get_estimated_size_and_observables(self, value, nested=False):
     if isinstance(value, observable.ObservableMixin):
@@ -346,6 +359,9 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
     elif t is bool:
       stream.write_byte(BOOL_TYPE)
       stream.write_byte(value)
+    elif t in _ITERABLE_LIKE_TYPES:
+      stream.write_byte(ITERABLE_LIKE_TYPE)
+      self.iterable_coder_impl.encode_to_stream(value, stream, nested)
     else:
       stream.write_byte(UNKNOWN_TYPE)
       self.fallback_coder_impl.encode_to_stream(value, stream, nested)
@@ -379,6 +395,8 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
       return v
     elif t == BOOL_TYPE:
       return not not stream.read_byte()
+    elif t == ITERABLE_LIKE_TYPE:
+      return self.iterable_coder_impl.decode_from_stream(stream, nested)
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
@@ -615,6 +633,30 @@ class TupleCoderImpl(AbstractComponentCoderImpl):
     return tuple(components)
 
 
+class _ConcatSequence(object):
+  def __init__(self, head, tail):
+    self._head = head
+    self._tail = tail
+
+  def __iter__(self):
+    for elem in self._head:
+      yield elem
+    for elem in self._tail:
+      yield elem
+
+  def __eq__(self, other):
+    return list(self) == list(other)
+
+  def __hash__(self):
+    raise NotImplementedError
+
+  def __reduce__(self):
+    return list, (list(self),)
+
+
+FastPrimitivesCoderImpl.register_iterable_like_type(_ConcatSequence)
+
+
 class SequenceCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees.
 
@@ -725,28 +767,8 @@ class SequenceCoderImpl(StreamCoderImpl):
           raise ValueError(
               'Cannot read state-written iterable without state reader.')
 
-        class FullIterable(object):
-          def __init__(self, head, tail):
-            self._head = head
-            self._tail = tail
-
-          def __iter__(self):
-            for elem in self._head:
-              yield elem
-            for elem in self._tail:
-              yield elem
-
-          def __eq__(self, other):
-            return list(self) == list(other)
-
-          def __hash__(self):
-            raise NotImplementedError
-
-          def __reduce__(self):
-            return list, (list(self),)
-
         state_token = in_stream.read_all(True)
-        elements = FullIterable(
+        elements = _ConcatSequence(
             elements, self._read_state(state_token, self._elem_coder))
 
     return self._construct_from_sequence(elements)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 679b8ea..b3b91ee 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -151,6 +151,10 @@ class _StateBackedIterable(object):
     return list, (list(self),)
 
 
+coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type(
+    _StateBackedIterable)
+
+
 class StateBackedSideInputMap(object):
   def __init__(self, state_handler, transform_id, tag, side_input_data, coder):
     self._state_handler = state_handler
@@ -269,6 +273,9 @@ class _ConcatIterable(object):
       yield elem
 
 
+coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type(_ConcatIterable)
+
+
 # TODO(BEAM-5428): Implement cross-bundle state caching.
 class SynchronousBagRuntimeState(userstate.RuntimeState):
   def __init__(self, state_handler, state_key, value_coder):