You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/19 00:50:18 UTC
[1/4] incubator-beam git commit: Closes #683
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 69f895a2e -> b00f915ee
Closes #683
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b00f915e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b00f915e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b00f915e
Branch: refs/heads/python-sdk
Commit: b00f915eeeb5bca9addda2a9a5ff500e81954d80
Parents: 69f895a 4c51273
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 17:48:33 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.pxd | 10 +++
sdks/python/apache_beam/coders/coder_impl.py | 80 ++++++++++++++++++++
sdks/python/apache_beam/coders/coders.py | 43 +++++++++++
.../apache_beam/coders/coders_test_common.py | 9 +++
sdks/python/apache_beam/coders/typecoders.py | 4 +-
5 files changed, 144 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[4/4] incubator-beam git commit: Implement coder optimized for coding
primitives.
Posted by ro...@apache.org.
Implement coder optimized for coding primitives.
This coder falls back to another coder (e.g. PickleCoder)
for other types, but is much faster for ints, strings, etc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f58c2bda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f58c2bda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f58c2bda
Branch: refs/heads/python-sdk
Commit: f58c2bdaae241718e0b4e250a90304317445613f
Parents: 69f895a
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jul 12 15:58:56 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.pxd | 8 +++
sdks/python/apache_beam/coders/coder_impl.py | 64 ++++++++++++++++++++
sdks/python/apache_beam/coders/coders.py | 43 +++++++++++++
.../apache_beam/coders/coders_test_common.py | 8 +++
4 files changed, 123 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index 0c92c5b..ee1590d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -59,6 +59,14 @@ cdef class DeterministicPickleCoderImpl(CoderImpl):
cdef bint _check_safe(self, value) except -1
+cdef object NoneType
+cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE
+cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE
+
+cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
+ cdef CoderImpl fallback_coder_impl
+
+
cdef class BytesCoderImpl(CoderImpl):
pass
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index ca64d9c..473cff5 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -26,6 +26,7 @@ coder_impl.pxd file for type hints.
"""
import collections
+from types import NoneType
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -147,6 +148,69 @@ class DeterministicPickleCoderImpl(CoderImpl):
return self._pickle_coder.decode(encoded)
+UNKNOWN_TYPE = 0xFF
+NONE_TYPE = 0
+INT_TYPE = 1
+FLOAT_TYPE = 2
+STR_TYPE = 3
+UNICODE_TYPE = 4
+LIST_TYPE = 5
+TUPLE_TYPE = 6
+
+
+class FastPrimitivesCoderImpl(StreamCoderImpl):
+
+ def __init__(self, fallback_coder_impl):
+ self.fallback_coder_impl = fallback_coder_impl
+
+ def encode_to_stream(self, value, stream, nested):
+ t = type(value)
+ if t is NoneType:
+ stream.write_byte(NONE_TYPE)
+ elif t is int:
+ stream.write_byte(INT_TYPE)
+ stream.write_var_int64(value)
+ elif t is float:
+ stream.write_byte(FLOAT_TYPE)
+ stream.write_bigendian_double(value)
+ elif t is str:
+ stream.write_byte(STR_TYPE)
+ stream.write(value, nested)
+ elif t is unicode:
+ stream.write_byte(UNICODE_TYPE)
+ stream.write(value.encode('utf-8'), nested)
+ elif t is list or t is tuple:
+ stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE)
+ stream.write_var_int64(len(value))
+ for e in value:
+ self.encode_to_stream(e, stream, True)
+ else:
+ stream.write_byte(UNKNOWN_TYPE)
+ self.fallback_coder_impl.encode_to_stream(value, stream, nested)
+
+ def decode_from_stream(self, stream, nested):
+ t = stream.read_byte()
+ if t == NONE_TYPE:
+ return None
+ elif t == INT_TYPE:
+ return stream.read_var_int64()
+ elif t == FLOAT_TYPE:
+ return stream.read_bigendian_double()
+ elif t == STR_TYPE:
+ return stream.read_all(nested)
+ elif t == UNICODE_TYPE:
+ return stream.read_all(nested).decode('utf-8')
+ elif t == LIST_TYPE or t == TUPLE_TYPE:
+ vlen = stream.read_var_int64()
+ vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)]
+ if t == LIST_TYPE:
+ return vlist
+ else:
+ return tuple(vlist)
+ else:
+ return self.fallback_coder_impl.decode_from_stream(stream, nested)
+
+
class BytesCoderImpl(CoderImpl):
"""A coder for bytes/str objects."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 619586f..cf5ca6d 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -354,6 +354,49 @@ class DeterministicPickleCoder(FastCoder):
return self
+class FastPrimitivesCoder(FastCoder):
+ """Encodes simple primitives (e.g. str, int) efficiently.
+
+ For unknown types, falls back to another coder (e.g. PickleCoder).
+ """
+ def __init__(self, fallback_coder):
+ self._fallback_coder = fallback_coder
+
+ def _create_impl(self):
+ return coder_impl.FastPrimitivesCoderImpl(
+ self._fallback_coder.get_impl())
+
+ def is_deterministic(self):
+ return self._fallback_coder.is_deterministic()
+
+ def as_cloud_object(self, is_pair_like=True):
+ value = super(FastCoder, self).as_cloud_object()
+ # We currently use this coder in places where we cannot infer the coder to
+ # use for the value type in a more granular way. In places where the
+ # service expects a pair, it checks for the "is_pair_like" key, in which
+ # case we would fail without the hack below.
+ if is_pair_like:
+ value['is_pair_like'] = True
+ value['component_encodings'] = [
+ self.as_cloud_object(is_pair_like=False),
+ self.as_cloud_object(is_pair_like=False)
+ ]
+
+ return value
+
+ # We allow .key_coder() and .value_coder() to be called on PickleCoder since
+ # we can't always infer the return values of lambdas in ParDo operations, the
+ # result of which may be used in a GroupBykey.
+ def is_kv_coder(self):
+ return True
+
+ def key_coder(self):
+ return self
+
+ def value_coder(self):
+ return self
+
+
class Base64PickleCoder(Coder):
"""Coder of objects by Python pickle, then base64 encoding."""
# TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 0266fdc..b084947 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -111,6 +111,14 @@ class CodersTest(unittest.TestCase):
coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
(1, cell_value))
+ def test_fast_primitives_coder(self):
+ coder = coders.FastPrimitivesCoder(coders.SingletonCoder(len))
+ self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101')
+ self.check_coder(coder, (), (1, 2, 3))
+ self.check_coder(coder, [], [1, 2, 3])
+ self.check_coder(coder, len)
+ self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))
+
def test_bytes_coder(self):
self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
[3/4] incubator-beam git commit: Add fast support for dicts for
default coder.
Posted by ro...@apache.org.
Add fast support for dicts for default coder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c512732
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c512732
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c512732
Branch: refs/heads/python-sdk
Commit: 4c512732bb29b840622df4bab892a4afa2731997
Parents: 64457d0
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 15:07:53 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.pxd | 4 +++-
sdks/python/apache_beam/coders/coder_impl.py | 18 +++++++++++++++++-
.../apache_beam/coders/coders_test_common.py | 1 +
3 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index ee1590d..ae1caa4 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -61,10 +61,12 @@ cdef class DeterministicPickleCoderImpl(CoderImpl):
cdef object NoneType
cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE
-cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE
+cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE
cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
cdef CoderImpl fallback_coder_impl
+ @cython.locals(unicode_value=unicode, dict_value=dict)
+ cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
cdef class BytesCoderImpl(CoderImpl):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 473cff5..433fd81 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -156,6 +156,7 @@ STR_TYPE = 3
UNICODE_TYPE = 4
LIST_TYPE = 5
TUPLE_TYPE = 6
+DICT_TYPE = 7
class FastPrimitivesCoderImpl(StreamCoderImpl):
@@ -177,13 +178,21 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
stream.write_byte(STR_TYPE)
stream.write(value, nested)
elif t is unicode:
+ unicode_value = value # for typing
stream.write_byte(UNICODE_TYPE)
- stream.write(value.encode('utf-8'), nested)
+ stream.write(unicode_value.encode('utf-8'), nested)
elif t is list or t is tuple:
stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE)
stream.write_var_int64(len(value))
for e in value:
self.encode_to_stream(e, stream, True)
+ elif t is dict:
+ dict_value = value # for typing
+ stream.write_byte(DICT_TYPE)
+ stream.write_var_int64(len(value))
+ for k, v in dict_value.iteritems():
+ self.encode_to_stream(k, stream, True)
+ self.encode_to_stream(v, stream, True)
else:
stream.write_byte(UNKNOWN_TYPE)
self.fallback_coder_impl.encode_to_stream(value, stream, nested)
@@ -207,6 +216,13 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
return vlist
else:
return tuple(vlist)
+ elif t == DICT_TYPE:
+ vlen = stream.read_var_int64()
+ v = {}
+ for _ in range(vlen):
+ k = self.decode_from_stream(stream, True)
+ v[k] = self.decode_from_stream(stream, True)
+ return v
else:
return self.fallback_coder_impl.decode_from_stream(stream, nested)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index b084947..deb5652 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -116,6 +116,7 @@ class CodersTest(unittest.TestCase):
self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101')
self.check_coder(coder, (), (1, 2, 3))
self.check_coder(coder, [], [1, 2, 3])
+ self.check_coder(coder, dict(), {'a': 'b'}, {0: dict(), 1: len})
self.check_coder(coder, len)
self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))
[2/4] incubator-beam git commit: Used fast primitives coder as
fallback coder.
Posted by ro...@apache.org.
Used fast primitives coder as fallback coder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64457d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64457d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64457d04
Branch: refs/heads/python-sdk
Commit: 64457d04f60859c7f0ef53c9d847090a9fa754e4
Parents: f58c2bd
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 14:25:08 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 2 +-
sdks/python/apache_beam/coders/typecoders.py | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index cf5ca6d..1c5043c 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -359,7 +359,7 @@ class FastPrimitivesCoder(FastCoder):
For unknown types, falls back to another coder (e.g. PickleCoder).
"""
- def __init__(self, fallback_coder):
+ def __init__(self, fallback_coder=PickleCoder()):
self._fallback_coder = fallback_coder
def _create_impl(self):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py
index 2fba752..74e5770 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -86,9 +86,9 @@ class CoderRegistry(object):
self._register_coder_internal(bytes, coders.BytesCoder)
self._register_coder_internal(unicode, coders.StrUtf8Coder)
self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
+ self._fallback_coder = fallback_coder or coders.FastPrimitivesCoder
self._register_coder_internal(typehints.AnyTypeConstraint,
- coders.PickleCoder)
- self._fallback_coder = fallback_coder or coders.PickleCoder
+ self._fallback_coder)
def _register_coder_internal(self, typehint_type, typehint_coder_class):
self._coders[typehint_type] = typehint_coder_class