You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/19 00:50:18 UTC

[1/4] incubator-beam git commit: Closes #683

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 69f895a2e -> b00f915ee


Closes #683


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b00f915e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b00f915e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b00f915e

Branch: refs/heads/python-sdk
Commit: b00f915eeeb5bca9addda2a9a5ff500e81954d80
Parents: 69f895a 4c51273
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 17:48:33 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   | 10 +++
 sdks/python/apache_beam/coders/coder_impl.py    | 80 ++++++++++++++++++++
 sdks/python/apache_beam/coders/coders.py        | 43 +++++++++++
 .../apache_beam/coders/coders_test_common.py    |  9 +++
 sdks/python/apache_beam/coders/typecoders.py    |  4 +-
 5 files changed, 144 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[4/4] incubator-beam git commit: Implement coder optimized for coding primitives.

Posted by ro...@apache.org.
Implement coder optimized for coding primitives.

This coder falls back to another coder (e.g. PickleCoder)
for other types, but is much faster for ints, strings, etc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f58c2bda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f58c2bda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f58c2bda

Branch: refs/heads/python-sdk
Commit: f58c2bdaae241718e0b4e250a90304317445613f
Parents: 69f895a
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jul 12 15:58:56 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   |  8 +++
 sdks/python/apache_beam/coders/coder_impl.py    | 64 ++++++++++++++++++++
 sdks/python/apache_beam/coders/coders.py        | 43 +++++++++++++
 .../apache_beam/coders/coders_test_common.py    |  8 +++
 4 files changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index 0c92c5b..ee1590d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -59,6 +59,14 @@ cdef class DeterministicPickleCoderImpl(CoderImpl):
   cdef bint _check_safe(self, value) except -1
 
 
+cdef object NoneType
+cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE
+cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE
+
+cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
+  cdef CoderImpl fallback_coder_impl
+
+
 cdef class BytesCoderImpl(CoderImpl):
   pass
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index ca64d9c..473cff5 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -26,6 +26,7 @@ coder_impl.pxd file for type hints.
 """
 
 import collections
+from types import NoneType
 
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -147,6 +148,69 @@ class DeterministicPickleCoderImpl(CoderImpl):
     return self._pickle_coder.decode(encoded)
 
 
+UNKNOWN_TYPE = 0xFF
+NONE_TYPE = 0
+INT_TYPE = 1
+FLOAT_TYPE = 2
+STR_TYPE = 3
+UNICODE_TYPE = 4
+LIST_TYPE = 5
+TUPLE_TYPE = 6
+
+
+class FastPrimitivesCoderImpl(StreamCoderImpl):
+
+  def __init__(self, fallback_coder_impl):
+    self.fallback_coder_impl = fallback_coder_impl
+
+  def encode_to_stream(self, value, stream, nested):
+    t = type(value)
+    if t is NoneType:
+      stream.write_byte(NONE_TYPE)
+    elif t is int:
+      stream.write_byte(INT_TYPE)
+      stream.write_var_int64(value)
+    elif t is float:
+      stream.write_byte(FLOAT_TYPE)
+      stream.write_bigendian_double(value)
+    elif t is str:
+      stream.write_byte(STR_TYPE)
+      stream.write(value, nested)
+    elif t is unicode:
+      stream.write_byte(UNICODE_TYPE)
+      stream.write(value.encode('utf-8'), nested)
+    elif t is list or t is tuple:
+      stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE)
+      stream.write_var_int64(len(value))
+      for e in value:
+        self.encode_to_stream(e, stream, True)
+    else:
+      stream.write_byte(UNKNOWN_TYPE)
+      self.fallback_coder_impl.encode_to_stream(value, stream, nested)
+
+  def decode_from_stream(self, stream, nested):
+    t = stream.read_byte()
+    if t == NONE_TYPE:
+      return None
+    elif t == INT_TYPE:
+      return stream.read_var_int64()
+    elif t == FLOAT_TYPE:
+      return stream.read_bigendian_double()
+    elif t == STR_TYPE:
+      return stream.read_all(nested)
+    elif t == UNICODE_TYPE:
+      return stream.read_all(nested).decode('utf-8')
+    elif t == LIST_TYPE or t == TUPLE_TYPE:
+      vlen = stream.read_var_int64()
+      vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)]
+      if t == LIST_TYPE:
+        return vlist
+      else:
+        return tuple(vlist)
+    else:
+      return self.fallback_coder_impl.decode_from_stream(stream, nested)
+
+
 class BytesCoderImpl(CoderImpl):
   """A coder for bytes/str objects."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 619586f..cf5ca6d 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -354,6 +354,49 @@ class DeterministicPickleCoder(FastCoder):
     return self
 
 
+class FastPrimitivesCoder(FastCoder):
+  """Encodes simple primitives (e.g. str, int) efficiently.
+
+  For unknown types, falls back to another coder (e.g. PickleCoder).
+  """
+  def __init__(self, fallback_coder):
+    self._fallback_coder = fallback_coder
+
+  def _create_impl(self):
+    return coder_impl.FastPrimitivesCoderImpl(
+        self._fallback_coder.get_impl())
+
+  def is_deterministic(self):
+    return self._fallback_coder.is_deterministic()
+
+  def as_cloud_object(self, is_pair_like=True):
+    value = super(FastCoder, self).as_cloud_object()
+    # We currently use this coder in places where we cannot infer the coder to
+    # use for the value type in a more granular way.  In places where the
+    # service expects a pair, it checks for the "is_pair_like" key, in which
+    # case we would fail without the hack below.
+    if is_pair_like:
+      value['is_pair_like'] = True
+      value['component_encodings'] = [
+          self.as_cloud_object(is_pair_like=False),
+          self.as_cloud_object(is_pair_like=False)
+      ]
+
+    return value
+
+  # We allow .key_coder() and .value_coder() to be called on PickleCoder since
+  # we can't always infer the return values of lambdas in ParDo operations, the
+  # result of which may be used in a GroupBykey.
+  def is_kv_coder(self):
+    return True
+
+  def key_coder(self):
+    return self
+
+  def value_coder(self):
+    return self
+
+
 class Base64PickleCoder(Coder):
   """Coder of objects by Python pickle, then base64 encoding."""
   # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 0266fdc..b084947 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -111,6 +111,14 @@ class CodersTest(unittest.TestCase):
         coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
         (1, cell_value))
 
+  def test_fast_primitives_coder(self):
+    coder = coders.FastPrimitivesCoder(coders.SingletonCoder(len))
+    self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101')
+    self.check_coder(coder, (), (1, 2, 3))
+    self.check_coder(coder, [], [1, 2, 3])
+    self.check_coder(coder, len)
+    self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))
+
   def test_bytes_coder(self):
     self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
 


[3/4] incubator-beam git commit: Add fast support for dicts for default coder.

Posted by ro...@apache.org.
Add fast support for dicts for default coder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c512732
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c512732
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c512732

Branch: refs/heads/python-sdk
Commit: 4c512732bb29b840622df4bab892a4afa2731997
Parents: 64457d0
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 15:07:53 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd     |  4 +++-
 sdks/python/apache_beam/coders/coder_impl.py      | 18 +++++++++++++++++-
 .../apache_beam/coders/coders_test_common.py      |  1 +
 3 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index ee1590d..ae1caa4 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -61,10 +61,12 @@ cdef class DeterministicPickleCoderImpl(CoderImpl):
 
 cdef object NoneType
 cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE
-cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE
+cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE
 
 cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
   cdef CoderImpl fallback_coder_impl
+  @cython.locals(unicode_value=unicode, dict_value=dict)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
 
 
 cdef class BytesCoderImpl(CoderImpl):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 473cff5..433fd81 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -156,6 +156,7 @@ STR_TYPE = 3
 UNICODE_TYPE = 4
 LIST_TYPE = 5
 TUPLE_TYPE = 6
+DICT_TYPE = 7
 
 
 class FastPrimitivesCoderImpl(StreamCoderImpl):
@@ -177,13 +178,21 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
       stream.write_byte(STR_TYPE)
       stream.write(value, nested)
     elif t is unicode:
+      unicode_value = value  # for typing
       stream.write_byte(UNICODE_TYPE)
-      stream.write(value.encode('utf-8'), nested)
+      stream.write(unicode_value.encode('utf-8'), nested)
     elif t is list or t is tuple:
       stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE)
       stream.write_var_int64(len(value))
       for e in value:
         self.encode_to_stream(e, stream, True)
+    elif t is dict:
+      dict_value = value  # for typing
+      stream.write_byte(DICT_TYPE)
+      stream.write_var_int64(len(value))
+      for k, v in dict_value.iteritems():
+        self.encode_to_stream(k, stream, True)
+        self.encode_to_stream(v, stream, True)
     else:
       stream.write_byte(UNKNOWN_TYPE)
       self.fallback_coder_impl.encode_to_stream(value, stream, nested)
@@ -207,6 +216,13 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
         return vlist
       else:
         return tuple(vlist)
+    elif t == DICT_TYPE:
+      vlen = stream.read_var_int64()
+      v = {}
+      for _ in range(vlen):
+        k = self.decode_from_stream(stream, True)
+        v[k] = self.decode_from_stream(stream, True)
+      return v
     else:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index b084947..deb5652 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -116,6 +116,7 @@ class CodersTest(unittest.TestCase):
     self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101')
     self.check_coder(coder, (), (1, 2, 3))
     self.check_coder(coder, [], [1, 2, 3])
+    self.check_coder(coder, dict(), {'a': 'b'}, {0: dict(), 1: len})
     self.check_coder(coder, len)
     self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))
 


[2/4] incubator-beam git commit: Used fast primitives coder as fallback coder.

Posted by ro...@apache.org.
Used fast primitives coder as fallback coder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64457d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64457d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64457d04

Branch: refs/heads/python-sdk
Commit: 64457d04f60859c7f0ef53c9d847090a9fa754e4
Parents: f58c2bd
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 14:25:08 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py     | 2 +-
 sdks/python/apache_beam/coders/typecoders.py | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index cf5ca6d..1c5043c 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -359,7 +359,7 @@ class FastPrimitivesCoder(FastCoder):
 
   For unknown types, falls back to another coder (e.g. PickleCoder).
   """
-  def __init__(self, fallback_coder):
+  def __init__(self, fallback_coder=PickleCoder()):
     self._fallback_coder = fallback_coder
 
   def _create_impl(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py
index 2fba752..74e5770 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -86,9 +86,9 @@ class CoderRegistry(object):
     self._register_coder_internal(bytes, coders.BytesCoder)
     self._register_coder_internal(unicode, coders.StrUtf8Coder)
     self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
+    self._fallback_coder = fallback_coder or coders.FastPrimitivesCoder
     self._register_coder_internal(typehints.AnyTypeConstraint,
-                                  coders.PickleCoder)
-    self._fallback_coder = fallback_coder or coders.PickleCoder
+                                  self._fallback_coder)
 
   def _register_coder_internal(self, typehint_type, typehint_coder_class):
     self._coders[typehint_type] = typehint_coder_class