You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/19 00:50:20 UTC

[3/4] incubator-beam git commit: Add fast support for dicts for default coder.

Add fast support for dicts for default coder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c512732
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c512732
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c512732

Branch: refs/heads/python-sdk
Commit: 4c512732bb29b840622df4bab892a4afa2731997
Parents: 64457d0
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 15:07:53 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd     |  4 +++-
 sdks/python/apache_beam/coders/coder_impl.py      | 18 +++++++++++++++++-
 .../apache_beam/coders/coders_test_common.py      |  1 +
 3 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index ee1590d..ae1caa4 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -61,10 +61,12 @@ cdef class DeterministicPickleCoderImpl(CoderImpl):
 
 cdef object NoneType
 cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE
-cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE
+cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE
 
 cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
   cdef CoderImpl fallback_coder_impl
+  @cython.locals(unicode_value=unicode, dict_value=dict)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
 
 
 cdef class BytesCoderImpl(CoderImpl):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 473cff5..433fd81 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -156,6 +156,7 @@ STR_TYPE = 3
 UNICODE_TYPE = 4
 LIST_TYPE = 5
 TUPLE_TYPE = 6
+DICT_TYPE = 7
 
 
 class FastPrimitivesCoderImpl(StreamCoderImpl):
@@ -177,13 +178,21 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
       stream.write_byte(STR_TYPE)
       stream.write(value, nested)
     elif t is unicode:
+      unicode_value = value  # for typing
       stream.write_byte(UNICODE_TYPE)
-      stream.write(value.encode('utf-8'), nested)
+      stream.write(unicode_value.encode('utf-8'), nested)
     elif t is list or t is tuple:
       stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE)
       stream.write_var_int64(len(value))
       for e in value:
         self.encode_to_stream(e, stream, True)
+    elif t is dict:
+      dict_value = value  # for typing
+      stream.write_byte(DICT_TYPE)
+      stream.write_var_int64(len(value))
+      for k, v in dict_value.iteritems():
+        self.encode_to_stream(k, stream, True)
+        self.encode_to_stream(v, stream, True)
     else:
       stream.write_byte(UNKNOWN_TYPE)
       self.fallback_coder_impl.encode_to_stream(value, stream, nested)
@@ -207,6 +216,13 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
         return vlist
       else:
         return tuple(vlist)
+    elif t == DICT_TYPE:
+      vlen = stream.read_var_int64()
+      v = {}
+      for _ in range(vlen):
+        k = self.decode_from_stream(stream, True)
+        v[k] = self.decode_from_stream(stream, True)
+      return v
     else:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c512732/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index b084947..deb5652 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -116,6 +116,7 @@ class CodersTest(unittest.TestCase):
     self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101')
     self.check_coder(coder, (), (1, 2, 3))
     self.check_coder(coder, [], [1, 2, 3])
+    self.check_coder(coder, dict(), {'a': 'b'}, {0: dict(), 1: len})
     self.check_coder(coder, len)
     self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))