You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/20 18:58:32 UTC
[2/3] incubator-beam git commit: Address Robert's comments.
Address Robert's comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3b6fe4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3b6fe4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3b6fe4b
Branch: refs/heads/python-sdk
Commit: a3b6fe4b3bffac39841403a3113fb052b8cb39ef
Parents: 48a19eb
Author: Charles Chen <cc...@google.com>
Authored: Tue Jul 19 17:54:56 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 20 11:58:11 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.pxd | 16 ++--
sdks/python/apache_beam/coders/coder_impl.py | 90 +++++++++++++++-----
.../apache_beam/coders/coders_test_common.py | 2 +
sdks/python/apache_beam/coders/slow_stream.py | 15 ++++
sdks/python/apache_beam/coders/stream.pxd | 2 +
sdks/python/apache_beam/coders/stream.pyx | 11 +++
6 files changed, 106 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index 05fdc89..2ca6743 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -28,7 +28,7 @@ cimport libc.string
from .stream cimport InputStream, OutputStream
-cdef object loads, dumps, create_InputStream, create_OutputStream
+cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size
# Temporarily untyped to allow monkeypatching on failed import.
#cdef type WindowedValue
@@ -38,8 +38,11 @@ cdef class CoderImpl(object):
cpdef decode_from_stream(self, InputStream stream, bint nested)
cpdef bytes encode(self, value)
cpdef decode(self, bytes encoded)
- cpdef estimate_size(self, value)
- cpdef get_estimated_size_and_observables(self, value)
+ cpdef estimate_size(self, value, bint nested=?)
+ @cython.locals(varint_size=int, bits=libc.stdint.uint64_t)
+ @cython.overflowcheck(False)
+ cpdef int _get_nested_size(self, int inner_size, bint nested)
+ cpdef get_estimated_size_and_observables(self, value, bint nested=?)
cdef class SimpleCoderImpl(CoderImpl):
@@ -94,9 +97,6 @@ cdef class SingletonCoderImpl(CoderImpl):
cdef object _value
-cpdef int ESTIMATED_NESTED_OVERHEAD
-
-
cdef class AbstractComponentCoderImpl(StreamCoderImpl):
cdef tuple _coder_impls
@@ -109,7 +109,7 @@ cdef class AbstractComponentCoderImpl(StreamCoderImpl):
cpdef decode_from_stream(self, InputStream stream, bint nested)
@cython.locals(c=CoderImpl)
- cpdef get_estimated_size_and_observables(self, value)
+ cpdef get_estimated_size_and_observables(self, value, bint nested=?)
cdef class TupleCoderImpl(AbstractComponentCoderImpl):
@@ -132,4 +132,4 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
cdef CoderImpl _windows_coder
@cython.locals(c=CoderImpl)
- cpdef get_estimated_size_and_observables(self, value)
+ cpdef get_estimated_size_and_observables(self, value, bint nested=?)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index d021e3b..0024fd8 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -39,12 +39,17 @@ except ImportError:
WindowedValue = collections.namedtuple(
'WindowedValue', ('value', 'timestamp', 'windows'))
+
try:
from stream import InputStream as create_InputStream
from stream import OutputStream as create_OutputStream
+ from stream import ByteCountingOutputStream
+ from stream import get_varint_size
except ImportError:
from slow_stream import InputStream as create_InputStream
from slow_stream import OutputStream as create_OutputStream
+ from slow_stream import ByteCountingOutputStream
+ from slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -66,11 +71,17 @@ class CoderImpl(object):
"""Encodes an object to an unnested string."""
raise NotImplementedError
- def estimate_size(self, value):
+ def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
- return len(self.encode(value))
+ return self._get_nested_size(len(self.encode(value)), nested)
+
+ def _get_nested_size(self, inner_size, nested):
+ if not nested:
+ return inner_size
+ varint_size = get_varint_size(inner_size)
+ return varint_size + inner_size
- def get_estimated_size_and_observables(self, value):
+ def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables.
The list of nested observables is returned as a list of 2-tuples of
@@ -80,12 +91,13 @@ class CoderImpl(object):
Arguments:
value: the value whose encoded size is to be estimated.
+ nested: whether the value is nested.
Returns:
The estimated encoded size of the given value and a list of observables
whose elements are 2-tuples of (obj, coder_impl) as described above.
"""
- return self.estimate_size(value), []
+ return self.estimate_size(value, nested), []
class SimpleCoderImpl(CoderImpl):
@@ -111,6 +123,12 @@ class StreamCoderImpl(CoderImpl):
def decode(self, encoded):
return self.decode_from_stream(create_InputStream(encoded), False)
+ def estimate_size(self, value, nested=False):
+ """Estimates the encoded size of the given value, in bytes."""
+ out = ByteCountingOutputStream()
+ self.encode_to_stream(value, out, nested)
+ return out.get_count()
+
class CallbackCoderImpl(CoderImpl):
"""A CoderImpl that calls back to the _impl methods on the Coder itself.
@@ -139,8 +157,8 @@ class CallbackCoderImpl(CoderImpl):
def decode(self, encoded):
return self._decoder(encoded)
- def estimate_size(self, value):
- return self._size_estimator(value)
+ def estimate_size(self, value, nested=False):
+ return self._get_nested_size(self._size_estimator(value), nested)
class DeterministicPickleCoderImpl(CoderImpl):
@@ -282,8 +300,8 @@ class FloatCoderImpl(StreamCoderImpl):
def decode_from_stream(self, in_stream, nested):
return in_stream.read_bigendian_double()
- def estimate_size(self, unused_value):
- # A double is encoded as 8 bytes.
+ def estimate_size(self, unused_value, nested=False):
+ # A double is encoded as 8 bytes, regardless of nesting.
return 8
@@ -298,8 +316,9 @@ class TimestampCoderImpl(StreamCoderImpl):
def decode_from_stream(self, in_stream, nested):
return self.timestamp_class(micros=in_stream.read_bigendian_int64())
- def estimate_size(self, unused_value):
- # A Timestamp is encoded as a 64-bit integer in 8 bytes.
+ def estimate_size(self, unused_value, nested=False):
+ # A Timestamp is encoded as a 64-bit integer in 8 bytes, regardless of
+ # nesting.
return 8
@@ -329,6 +348,10 @@ class VarIntCoderImpl(StreamCoderImpl):
return i
return StreamCoderImpl.decode(self, encoded)
+ def estimate_size(self, value, nested=False):
+ # Note that VarInts are encoded the same way regardless of nesting.
+ return get_varint_size(value)
+
class SingletonCoderImpl(CoderImpl):
"""A coder that always encodes exactly one value."""
@@ -349,10 +372,8 @@ class SingletonCoderImpl(CoderImpl):
def decode(self, encoded):
return self._value
-
-# Number of bytes of overhead estimated for encoding the nested size of a
-# component as a VarInt64.
-ESTIMATED_NESTED_OVERHEAD = 2
+ def estimate_size(self, value, nested=False):
+ return 0
class AbstractComponentCoderImpl(StreamCoderImpl):
@@ -382,13 +403,13 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
return self._construct_from_components(
[c.decode_from_stream(in_stream, True) for c in self._coder_impls])
- def estimate_size(self, value):
+ def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
estimated_size, _ = (
self.get_estimated_size_and_observables(value))
return estimated_size
- def get_estimated_size_and_observables(self, value):
+ def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables."""
values = self._extract_components(value)
estimated_size = 0
@@ -400,8 +421,8 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
else:
c = self._coder_impls[i] # type cast
child_size, child_observables = (
- c.get_estimated_size_and_observables(child_value))
- estimated_size += child_size + ESTIMATED_NESTED_OVERHEAD
+ c.get_estimated_size_and_observables(child_value, nested=True))
+ estimated_size += child_size
observables += child_observables
return estimated_size, observables
@@ -437,6 +458,29 @@ class SequenceCoderImpl(StreamCoderImpl):
[self._elem_coder.decode_from_stream(in_stream, True)
for _ in range(size)])
+ def estimate_size(self, value, nested=False):
+ """Estimates the encoded size of the given value, in bytes."""
+ estimated_size, _ = (
+ self.get_estimated_size_and_observables(value))
+ return estimated_size
+
+ def get_estimated_size_and_observables(self, value, nested=False):
+ """Returns estimated size of value along with any nested observables."""
+ estimated_size = 0
+ observables = []
+ # Size of 32-bit integer storing number of elements.
+ estimated_size += 4
+ for elem in value:
+ if isinstance(elem, observable.ObservableMixin):
+ observables.append((elem, self._elem_coder))
+ else:
+ child_size, child_observables = (
+ self._elem_coder.get_estimated_size_and_observables(
+ elem, nested=True))
+ estimated_size += child_size
+ observables += child_observables
+ return estimated_size, observables
+
class TupleSequenceCoderImpl(SequenceCoderImpl):
"""A coder for homogeneous tuple objects."""
@@ -464,7 +508,7 @@ class WindowedValueCoderImpl(StreamCoderImpl):
self._timestamp_coder.decode_from_stream(in_stream, True),
self._windows_coder.decode_from_stream(in_stream, True))
- def get_estimated_size_and_observables(self, value):
+ def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables."""
estimated_size = 0
observables = []
@@ -473,9 +517,11 @@ class WindowedValueCoderImpl(StreamCoderImpl):
else:
c = self._value_coder # type cast
value_estimated_size, value_observables = (
- c.get_estimated_size_and_observables(value.value))
+ c.get_estimated_size_and_observables(value.value, nested=True))
estimated_size += value_estimated_size
observables += value_observables
- estimated_size += self._timestamp_coder.estimate_size(value.timestamp)
- estimated_size += self._windows_coder.estimate_size(value.windows)
+ estimated_size += (
+ self._timestamp_coder.estimate_size(value.timestamp, nested=True))
+ estimated_size += (
+ self._windows_coder.estimate_size(value.windows, nested=True))
return estimated_size, observables
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index d55e8c2..dd4a873 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -81,6 +81,8 @@ class CodersTest(unittest.TestCase):
for v in values:
self.assertEqual(v, coder.decode(coder.encode(v)))
self.assertEqual(coder.estimate_size(v),
+ len(coder.encode(v)))
+ self.assertEqual(coder.estimate_size(v),
coder.get_impl().estimate_size(v))
self.assertEqual(coder.get_impl().get_estimated_size_and_observables(v),
(coder.get_impl().estimate_size(v), []))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
index 71d5e08..ceb1f0d 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -137,3 +137,18 @@ class InputStream(object):
def read_bigendian_double(self):
return struct.unpack('>d', self.read(8))[0]
+
+
+def get_varint_size(v):
+ """Returns the size of the given integer value when encode as a VarInt."""
+ if v < 0:
+ v += 1 << 64
+ if v <= 0:
+ raise ValueError('Value too large (negative).')
+ varint_size = 0
+ while True:
+ varint_size += 1
+ v >>= 7
+ if not v:
+ break
+ return varint_size
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/stream.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd
index eb39bdf..16ea5d4 100644
--- a/sdks/python/apache_beam/coders/stream.pxd
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -59,3 +59,5 @@ cdef class InputStream(object):
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
cpdef double read_bigendian_double(self) except? -1
cpdef bytes read_all(self, bint nested=*)
+
+cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
index 1add4cb..cde900f 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -202,3 +202,14 @@ cdef class InputStream(object):
cpdef double read_bigendian_double(self) except? -1:
cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
return (<double*><char*>&as_long)[0]
+
+cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
+ """Returns the size of the given integer value when encode as a VarInt."""
+ cdef libc.stdint.int64_t varint_size = 0
+ cdef libc.stdint.uint64_t bits = value
+ while True:
+ varint_size += 1
+ bits >>= 7
+ if not bits:
+ break
+ return varint_size