You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/20 18:58:33 UTC
[3/3] incubator-beam git commit: Add size-estimation support to
Python SDK Coders
Add size-estimation support to Python SDK Coders
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/48a19eb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/48a19eb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/48a19eb2
Branch: refs/heads/python-sdk
Commit: 48a19eb2d1db0a878778a822835a9ef5591a8785
Parents: 2212628
Author: Charles Chen <cc...@google.com>
Authored: Mon Jul 18 12:48:11 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 20 11:58:11 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.pxd | 12 +++
sdks/python/apache_beam/coders/coder_impl.py | 85 +++++++++++++++++++-
sdks/python/apache_beam/coders/coders.py | 28 ++++++-
.../apache_beam/coders/coders_test_common.py | 34 ++++++++
4 files changed, 156 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index ae1caa4..05fdc89 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -38,6 +38,8 @@ cdef class CoderImpl(object):
cpdef decode_from_stream(self, InputStream stream, bint nested)
cpdef bytes encode(self, value)
cpdef decode(self, bytes encoded)
+ cpdef estimate_size(self, value)
+ cpdef get_estimated_size_and_observables(self, value)
cdef class SimpleCoderImpl(CoderImpl):
@@ -51,6 +53,7 @@ cdef class StreamCoderImpl(CoderImpl):
cdef class CallbackCoderImpl(CoderImpl):
cdef object _encoder
cdef object _decoder
+ cdef object _size_estimator
cdef class DeterministicPickleCoderImpl(CoderImpl):
@@ -91,6 +94,9 @@ cdef class SingletonCoderImpl(CoderImpl):
cdef object _value
+cpdef int ESTIMATED_NESTED_OVERHEAD
+
+
cdef class AbstractComponentCoderImpl(StreamCoderImpl):
cdef tuple _coder_impls
@@ -102,6 +108,9 @@ cdef class AbstractComponentCoderImpl(StreamCoderImpl):
@cython.locals(c=CoderImpl)
cpdef decode_from_stream(self, InputStream stream, bint nested)
+ @cython.locals(c=CoderImpl)
+ cpdef get_estimated_size_and_observables(self, value)
+
cdef class TupleCoderImpl(AbstractComponentCoderImpl):
pass
@@ -121,3 +130,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
cdef CoderImpl _value_coder
cdef CoderImpl _timestamp_coder
cdef CoderImpl _windows_coder
+
+ @cython.locals(c=CoderImpl)
+ cpdef get_estimated_size_and_observables(self, value)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 433fd81..d021e3b 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -28,6 +28,8 @@ coder_impl.pxd file for type hints.
import collections
from types import NoneType
+from apache_beam.coders import observable
+
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
@@ -64,6 +66,27 @@ class CoderImpl(object):
"""Encodes an object to an unnested string."""
raise NotImplementedError
+ def estimate_size(self, value):
+ """Estimates the encoded size of the given value, in bytes."""
+ return len(self.encode(value))
+
+ def get_estimated_size_and_observables(self, value):
+ """Returns estimated size of value along with any nested observables.
+
+ The list of nested observables is returned as a list of 2-tuples of
+ (obj, coder_impl), where obj is an instance of observable.ObservableMixin,
+ and coder_impl is the CoderImpl that can be used to encode elements sent by
+ obj to its observers.
+
+ Arguments:
+ value: the value whose encoded size is to be estimated.
+
+ Returns:
+ The estimated encoded size of the given value and a list of observables
+ whose elements are 2-tuples of (obj, coder_impl) as described above.
+ """
+ return self.estimate_size(value), []
+
class SimpleCoderImpl(CoderImpl):
"""Subclass of CoderImpl implementing stream methods using encode/decode."""
@@ -96,9 +119,13 @@ class CallbackCoderImpl(CoderImpl):
is not overwritten.
"""
- def __init__(self, encoder, decoder):
+ def __init__(self, encoder, decoder, size_estimator=None):
self._encoder = encoder
self._decoder = decoder
+ self._size_estimator = size_estimator or self._default_size_estimator
+
+ def _default_size_estimator(self, value):
+ return len(self.encode(value))
def encode_to_stream(self, value, stream, nested):
return stream.write(self._encoder(value), nested)
@@ -112,6 +139,9 @@ class CallbackCoderImpl(CoderImpl):
def decode(self, encoded):
return self._decoder(encoded)
+ def estimate_size(self, value):
+ return self._size_estimator(value)
+
class DeterministicPickleCoderImpl(CoderImpl):
@@ -252,6 +282,10 @@ class FloatCoderImpl(StreamCoderImpl):
def decode_from_stream(self, in_stream, nested):
return in_stream.read_bigendian_double()
+ def estimate_size(self, unused_value):
+ # A double is encoded as 8 bytes.
+ return 8
+
class TimestampCoderImpl(StreamCoderImpl):
@@ -264,6 +298,10 @@ class TimestampCoderImpl(StreamCoderImpl):
def decode_from_stream(self, in_stream, nested):
return self.timestamp_class(micros=in_stream.read_bigendian_int64())
+ def estimate_size(self, unused_value):
+ # A Timestamp is encoded as a 64-bit integer in 8 bytes.
+ return 8
+
small_ints = [chr(_) for _ in range(128)]
@@ -312,7 +350,13 @@ class SingletonCoderImpl(CoderImpl):
return self._value
+# Number of bytes of overhead estimated for encoding the nested size of a
+# component as a VarInt64.
+ESTIMATED_NESTED_OVERHEAD = 2
+
+
class AbstractComponentCoderImpl(StreamCoderImpl):
+ """CoderImpl for coders that are comprised of several component coders."""
def __init__(self, coder_impls):
for c in coder_impls:
@@ -338,6 +382,29 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
return self._construct_from_components(
[c.decode_from_stream(in_stream, True) for c in self._coder_impls])
+ def estimate_size(self, value):
+ """Estimates the encoded size of the given value, in bytes."""
+ estimated_size, _ = (
+ self.get_estimated_size_and_observables(value))
+ return estimated_size
+
+ def get_estimated_size_and_observables(self, value):
+ """Returns estimated size of value along with any nested observables."""
+ values = self._extract_components(value)
+ estimated_size = 0
+ observables = []
+ for i in range(0, len(self._coder_impls)):
+ child_value = values[i]
+ if isinstance(child_value, observable.ObservableMixin):
+ observables.append((child_value, self._coder_impls[i]))
+ else:
+ c = self._coder_impls[i] # type cast
+ child_size, child_observables = (
+ c.get_estimated_size_and_observables(child_value))
+ estimated_size += child_size + ESTIMATED_NESTED_OVERHEAD
+ observables += child_observables
+ return estimated_size, observables
+
class TupleCoderImpl(AbstractComponentCoderImpl):
"""A coder for tuple objects."""
@@ -396,3 +463,19 @@ class WindowedValueCoderImpl(StreamCoderImpl):
self._value_coder.decode_from_stream(in_stream, True),
self._timestamp_coder.decode_from_stream(in_stream, True),
self._windows_coder.decode_from_stream(in_stream, True))
+
+ def get_estimated_size_and_observables(self, value):
+ """Returns estimated size of value along with any nested observables."""
+ estimated_size = 0
+ observables = []
+ if isinstance(value.value, observable.ObservableMixin):
+ observables.append((value.value, self._value_coder))
+ else:
+ c = self._value_coder # type cast
+ value_estimated_size, value_observables = (
+ c.get_estimated_size_and_observables(value.value))
+ estimated_size += value_estimated_size
+ observables += value_observables
+ estimated_size += self._timestamp_coder.estimate_size(value.timestamp)
+ estimated_size += self._windows_coder.estimate_size(value.windows)
+ return estimated_size, observables
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index a5ed7f9..11964b0 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -77,6 +77,26 @@ class Coder(object):
"""
return False
+ def estimate_size(self, value):
+ """Estimates the encoded size of the given value, in bytes.
+
+ Dataflow estimates the encoded size of a PCollection processed in a pipeline
+ step by using the estimated size of a random sample of elements in that
+ PCollection.
+
+ The default implementation encodes the given value and returns its byte
+ size. If a coder can provide a fast estimate of the encoded size of a value
+ (e.g., if the encoding has a fixed size), it can provide its estimate here
+ to improve performance.
+
+ Arguments:
+ value: the value whose encoded size is to be estimated.
+
+ Returns:
+ The estimated encoded size of the given value.
+ """
+ return len(self.encode(value))
+
# ===========================================================================
# Methods below are internal SDK details that don't need to be modified for
# user-defined coders.
@@ -85,7 +105,8 @@ class Coder(object):
def _create_impl(self):
"""Creates a CoderImpl to do the actual encoding and decoding.
"""
- return coder_impl.CallbackCoderImpl(self.encode, self.decode)
+ return coder_impl.CallbackCoderImpl(self.encode, self.decode,
+ self.estimate_size)
def get_impl(self):
if not hasattr(self, '_impl'):
@@ -191,7 +212,7 @@ class FastCoder(Coder):
"""Coder subclass used when a (faster) CoderImpl is supplied directly.
The Coder class defines _create_impl in terms of encode() and decode();
- this class inverts that defining encode() and decode() in terms of
+ this class inverts that by defining encode() and decode() in terms of
_create_impl().
"""
@@ -203,6 +224,9 @@ class FastCoder(Coder):
"""Decodes the given byte string into the corresponding object."""
return self.get_impl().decode(encoded)
+ def estimate_size(self, value):
+ return self.get_impl().estimate_size(value)
+
def _create_impl(self):
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index deb5652..d55e8c2 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -24,6 +24,7 @@ import unittest
import dill
import coders
+import observable
# Defined out of line for picklability.
@@ -79,6 +80,10 @@ class CodersTest(unittest.TestCase):
self._observe(coder)
for v in values:
self.assertEqual(v, coder.decode(coder.encode(v)))
+ self.assertEqual(coder.estimate_size(v),
+ coder.get_impl().estimate_size(v))
+ self.assertEqual(coder.get_impl().get_estimated_size_and_observables(v),
+ (coder.get_impl().estimate_size(v), []))
copy1 = dill.loads(dill.dumps(coder))
copy2 = dill.loads(dill.dumps(coder))
for v in values:
@@ -186,6 +191,35 @@ class CodersTest(unittest.TestCase):
def test_utf8_coder(self):
self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
+ def test_nested_observables(self):
+ class FakeObservableIterator(observable.ObservableMixin):
+
+ def __iter__(self):
+ return iter([1, 2, 3])
+
+ # Coder for elements from the observable iterator.
+ iter_coder = coders.VarIntCoder()
+
+ # Test nested WindowedValue observable.
+ coder = coders.WindowedValueCoder(iter_coder)
+ observ = FakeObservableIterator()
+ try:
+ value = coders.coder_impl.WindowedValue(observ)
+ except TypeError:
+ # We are running tests with a fake WindowedValue implementation so as to
+ # not pull in the rest of the SDK.
+ value = coders.coder_impl.WindowedValue(observ, 0, [])
+ self.assertEqual(
+ coder.get_impl().get_estimated_size_and_observables(value)[1],
+ [(observ, iter_coder.get_impl())])
+
+ # Test nested tuple observable.
+ coder = coders.TupleCoder((coders.StrUtf8Coder(), iter_coder))
+ value = (u'123', observ)
+ self.assertEqual(
+ coder.get_impl().get_estimated_size_and_observables(value)[1],
+ [(observ, iter_coder.get_impl())])
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)