You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/09/15 00:35:02 UTC
[beam] branch master updated: Optimize varint reading and writing for small ints. (#23192)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 26c94b20f74 Optimize varint reading and writing for small ints. (#23192)
26c94b20f74 is described below
commit 26c94b20f74c7d7d64ca64522953cf4262786bbc
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Wed Sep 14 17:34:48 2022 -0700
Optimize varint reading and writing for small ints. (#23192)
This is used as a primitive for several other coders.
Before:
tiny_row, RowCoder, 1000 element(s) : p. element median time cost: 7.22051e-07 sec, relative std: 12.45%
large_row, RowCoder, 1000 element(s) : p. element median time cost: 3.12304e-06 sec, relative std: 5.01%
nullable_row, RowCoder, 1000 element(s): p. element median time cost: 3.27158e-06 sec, relative std: 6.49%
diverse_row, RowCoder, 1000 element(s) : p. element median time cost: 2.33555e-06 sec, relative std: 7.48%
small_int, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 1.66655e-07 sec, relative std: 9.18%
large_int, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 3.39985e-07 sec, relative std: 3.05%
small_int, LengthPrefixCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 5.70536e-07 sec, relative std: 4.27%
small_list, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 1.57607e-06 sec, relative std: 3.05%
large_list, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 4.76551e-05 sec, relative std: 4.31%
small_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 1.59943e-06 sec, relative std: 2.11%
large_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 4.66548e-05 sec, relative std: 3.59%
After:
tiny_row, RowCoder, 1000 element(s) : p. element median time cost: 6.68406e-07 sec, relative std: 13.95%
large_row, RowCoder, 1000 element(s) : p. element median time cost: 2.88701e-06 sec, relative std: 6.26%
nullable_row, RowCoder, 1000 element(s): p. element median time cost: 3.19052e-06 sec, relative std: 8.06%
diverse_row, RowCoder, 1000 element(s) : p. element median time cost: 2.24805e-06 sec, relative std: 6.15%
small_int, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 1.65582e-07 sec, relative std: 13.75%
large_int, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 3.14236e-07 sec, relative std: 6.83%
small_int, LengthPrefixCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 5.85556e-07 sec, relative std: 4.98%
small_list, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 1.48404e-06 sec, relative std: 42.11%
large_list, FastPrimitivesCoder, 1000 element(s) : p. element median time cost: 4.77622e-05 sec, relative std: 4.20%
small_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 1.50216e-06 sec, relative std: 1.90%
large_list, IterableCoder[FastPrimitivesCoder], 1000 element(s): p. element median time cost: 4.57406e-05 sec, relative std: 3.11%
---
sdks/python/apache_beam/coders/stream.pyx | 24 +++++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
index 2a6451cb29e..14536b007cc 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -59,6 +59,12 @@ cdef class OutputStream(object):
cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
"""Encode a long using variable-length encoding to a stream."""
cdef libc.stdint.uint64_t v = signed_v
+ # Inline common case.
+ if v <= 0x7F and self.pos < self.buffer_size - 1:
+ self.data[self.pos] = v
+ self.pos += 1
+ return
+
cdef long bits
while True:
bits = v & 0x7F
@@ -136,6 +142,9 @@ cdef class ByteCountingOutputStream(OutputStream):
self.write_var_int64(blen)
self.count += blen
+ cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
+ self.count += get_varint_size(signed_v)
+
cpdef write_byte(self, unsigned char _):
self.count += 1
@@ -183,15 +192,16 @@ cdef class InputStream(object):
cpdef libc.stdint.int64_t read_var_int64(self) except? -1:
"""Decode a variable-length encoded long from a stream."""
- cdef long byte
+ # Inline common case.
+ cdef long byte = <unsigned char> self.allc[self.pos]
+ self.pos += 1
+ if byte <= 0x7F:
+ return byte
+
cdef libc.stdint.int64_t bits
cdef long shift = 0
cdef libc.stdint.int64_t result = 0
while True:
- byte = self.read_byte()
- if byte < 0:
- raise RuntimeError('VarInt not terminated.')
-
bits = byte & 0x7F
if (shift >= sizeof(libc.stdint.int64_t) * 8 or
(shift >= (sizeof(libc.stdint.int64_t) * 8 - 1) and bits > 1)):
@@ -200,6 +210,10 @@ cdef class InputStream(object):
shift += 7
if not (byte & 0x80):
break
+ byte = self.read_byte()
+ if byte < 0:
+ raise RuntimeError('VarInt not terminated.')
+
return result
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1: