You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/09/15 00:35:02 UTC

[beam] branch master updated: Optimize varint reading and writing for small ints. (#23192)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c94b20f74 Optimize varint reading and writing for small ints. (#23192)
26c94b20f74 is described below

commit 26c94b20f74c7d7d64ca64522953cf4262786bbc
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Wed Sep 14 17:34:48 2022 -0700

    Optimize varint reading and writing for small ints. (#23192)
    
    This is used as a primitive for several other coders.
    
    Before:
    
    tiny_row, RowCoder, 1000 element(s)    : p. element median time cost: 7.22051e-07 sec, relative std: 12.45%
    large_row, RowCoder, 1000 element(s)   : p. element median time cost: 3.12304e-06 sec, relative std: 5.01%
    nullable_row, RowCoder, 1000 element(s): p. element median time cost: 3.27158e-06 sec, relative std: 6.49%
    diverse_row, RowCoder, 1000 element(s) : p. element median time cost: 2.33555e-06 sec, relative std: 7.48%
    small_int, FastPrimitivesCoder, 1000 element(s)                   : p. element median time cost: 1.66655e-07 sec, relative std: 9.18%
    large_int, FastPrimitivesCoder, 1000 element(s)                   : p. element median time cost: 3.39985e-07 sec, relative std: 3.05%
    small_int, LengthPrefixCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 5.70536e-07 sec, relative std: 4.27%
    small_list, FastPrimitivesCoder, 1000 element(s)               : p. element median time cost: 1.57607e-06 sec, relative std: 3.05%
    large_list, FastPrimitivesCoder, 1000 element(s)               : p. element median time cost: 4.76551e-05 sec, relative std: 4.31%
    small_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 1.59943e-06 sec, relative std: 2.11%
    large_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 4.66548e-05 sec, relative std: 3.59%
    
    After:
    
    tiny_row, RowCoder, 1000 element(s)    : p. element median time cost: 6.68406e-07 sec, relative std: 13.95%
    large_row, RowCoder, 1000 element(s)   : p. element median time cost: 2.88701e-06 sec, relative std: 6.26%
    nullable_row, RowCoder, 1000 element(s): p. element median time cost: 3.19052e-06 sec, relative std: 8.06%
    diverse_row, RowCoder, 1000 element(s) : p. element median time cost: 2.24805e-06 sec, relative std: 6.15%
    small_int, FastPrimitivesCoder, 1000 element(s)                   : p. element median time cost: 1.65582e-07 sec, relative std: 13.75%
    large_int, FastPrimitivesCoder, 1000 element(s)                   : p. element median time cost: 3.14236e-07 sec, relative std: 6.83%
    small_int, LengthPrefixCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 5.85556e-07 sec, relative std: 4.98%
    small_list, FastPrimitivesCoder, 1000 element(s)               : p. element median time cost: 1.48404e-06 sec, relative std: 42.11%
    large_list, FastPrimitivesCoder, 1000 element(s)               : p. element median time cost: 4.77622e-05 sec, relative std: 4.20%
    small_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 1.50216e-06 sec, relative std: 1.90%
    large_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 4.57406e-05 sec, relative std: 3.11%
---
 sdks/python/apache_beam/coders/stream.pyx | 24 +++++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
index 2a6451cb29e..14536b007cc 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -59,6 +59,12 @@ cdef class OutputStream(object):
   cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
     """Encode a long using variable-length encoding to a stream."""
     cdef libc.stdint.uint64_t v = signed_v
+    # Inline common case.
+    if v <= 0x7F and self.pos < self.buffer_size - 1:
+      self.data[self.pos] = v
+      self.pos += 1
+      return
+
     cdef long bits
     while True:
       bits = v & 0x7F
@@ -136,6 +142,9 @@ cdef class ByteCountingOutputStream(OutputStream):
       self.write_var_int64(blen)
     self.count += blen
 
+  cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
+    self.count += get_varint_size(signed_v)
+
   cpdef write_byte(self, unsigned char _):
     self.count += 1
 
@@ -183,15 +192,16 @@ cdef class InputStream(object):
 
   cpdef libc.stdint.int64_t read_var_int64(self) except? -1:
     """Decode a variable-length encoded long from a stream."""
-    cdef long byte
+    # Inline common case.
+    cdef long byte = <unsigned char> self.allc[self.pos]
+    self.pos += 1
+    if byte <= 0x7F:
+      return byte
+
     cdef libc.stdint.int64_t bits
     cdef long shift = 0
     cdef libc.stdint.int64_t result = 0
     while True:
-      byte = self.read_byte()
-      if byte < 0:
-        raise RuntimeError('VarInt not terminated.')
-
       bits = byte & 0x7F
       if (shift >= sizeof(libc.stdint.int64_t) * 8 or
           (shift >= (sizeof(libc.stdint.int64_t) * 8 - 1) and bits > 1)):
@@ -200,6 +210,10 @@ cdef class InputStream(object):
       shift += 7
       if not (byte & 0x80):
         break
+      byte = self.read_byte()
+      if byte < 0:
+        raise RuntimeError('VarInt not terminated.')
+
     return result
 
   cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1: