You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/01/18 22:49:13 UTC
[beam] branch master updated: Allow more efficient encoding of
generic iterable types.
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cd77c1e Allow more efficient encoding of generic iterable types.
new 897ff50 Merge pull request #7427 More efficient generic iterables
cd77c1e is described below
commit cd77c1e244aeb765fb43ece5637f39f4e6049dfd
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Mon Jan 7 14:59:42 2019 +0100
Allow more efficient encoding of generic iterable types.
---
sdks/python/apache_beam/coders/coder_impl.pxd | 5 +-
sdks/python/apache_beam/coders/coder_impl.py | 64 +++++++++++++++-------
.../apache_beam/runners/worker/bundle_processor.py | 7 +++
3 files changed, 54 insertions(+), 22 deletions(-)
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index 9d5ac80..c5ce4e8 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -74,10 +74,13 @@ cdef class DeterministicFastPrimitivesCoderImpl(CoderImpl):
cdef object NoneType
cdef unsigned char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE, BOOL_TYPE
cdef unsigned char BYTES_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE
-cdef unsigned char SET_TYPE
+cdef unsigned char SET_TYPE, ITERABLE_LIKE_TYPE
+
+cdef set _ITERABLE_LIKE_TYPES
cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
cdef CoderImpl fallback_coder_impl
+ cdef CoderImpl iterable_coder_impl
@cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
unicode_value=unicode)
cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 244b82c..d419038 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -282,6 +282,14 @@ LIST_TYPE = 5
TUPLE_TYPE = 6
DICT_TYPE = 7
SET_TYPE = 8
+ITERABLE_LIKE_TYPE = 10
+
+
+# Types that can be encoded as iterables, but are not literally
+# lists, etc. due to being lazy. The actual type is not preserved
+# through encoding, only the elements. This is particularly useful
+# for the value list types created in GroupByKey.
+_ITERABLE_LIKE_TYPES = set()
class FastPrimitivesCoderImpl(StreamCoderImpl):
@@ -289,6 +297,11 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
def __init__(self, fallback_coder_impl):
self.fallback_coder_impl = fallback_coder_impl
+ self.iterable_coder_impl = IterableCoderImpl(self)
+
+ @staticmethod
+ def register_iterable_like_type(t):
+ _ITERABLE_LIKE_TYPES.add(t)
def get_estimated_size_and_observables(self, value, nested=False):
if isinstance(value, observable.ObservableMixin):
@@ -346,6 +359,9 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
elif t is bool:
stream.write_byte(BOOL_TYPE)
stream.write_byte(value)
+ elif t in _ITERABLE_LIKE_TYPES:
+ stream.write_byte(ITERABLE_LIKE_TYPE)
+ self.iterable_coder_impl.encode_to_stream(value, stream, nested)
else:
stream.write_byte(UNKNOWN_TYPE)
self.fallback_coder_impl.encode_to_stream(value, stream, nested)
@@ -379,6 +395,8 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
return v
elif t == BOOL_TYPE:
return not not stream.read_byte()
+ elif t == ITERABLE_LIKE_TYPE:
+ return self.iterable_coder_impl.decode_from_stream(stream, nested)
elif t == UNKNOWN_TYPE:
return self.fallback_coder_impl.decode_from_stream(stream, nested)
else:
@@ -615,6 +633,30 @@ class TupleCoderImpl(AbstractComponentCoderImpl):
return tuple(components)
+class _ConcatSequence(object):
+ def __init__(self, head, tail):
+ self._head = head
+ self._tail = tail
+
+ def __iter__(self):
+ for elem in self._head:
+ yield elem
+ for elem in self._tail:
+ yield elem
+
+ def __eq__(self, other):
+ return list(self) == list(other)
+
+ def __hash__(self):
+ raise NotImplementedError
+
+ def __reduce__(self):
+ return list, (list(self),)
+
+
+FastPrimitivesCoderImpl.register_iterable_like_type(_ConcatSequence)
+
+
class SequenceCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
@@ -725,28 +767,8 @@ class SequenceCoderImpl(StreamCoderImpl):
raise ValueError(
'Cannot read state-written iterable without state reader.')
- class FullIterable(object):
- def __init__(self, head, tail):
- self._head = head
- self._tail = tail
-
- def __iter__(self):
- for elem in self._head:
- yield elem
- for elem in self._tail:
- yield elem
-
- def __eq__(self, other):
- return list(self) == list(other)
-
- def __hash__(self):
- raise NotImplementedError
-
- def __reduce__(self):
- return list, (list(self),)
-
state_token = in_stream.read_all(True)
- elements = FullIterable(
+ elements = _ConcatSequence(
elements, self._read_state(state_token, self._elem_coder))
return self._construct_from_sequence(elements)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 679b8ea..b3b91ee 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -151,6 +151,10 @@ class _StateBackedIterable(object):
return list, (list(self),)
+coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type(
+ _StateBackedIterable)
+
+
class StateBackedSideInputMap(object):
def __init__(self, state_handler, transform_id, tag, side_input_data, coder):
self._state_handler = state_handler
@@ -269,6 +273,9 @@ class _ConcatIterable(object):
yield elem
+coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type(_ConcatIterable)
+
+
# TODO(BEAM-5428): Implement cross-bundle state caching.
class SynchronousBagRuntimeState(userstate.RuntimeState):
def __init__(self, state_handler, state_key, value_coder):