You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/09 21:32:29 UTC
[1/2] incubator-beam git commit: Optimize WindowedValueCoder
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk ea642428f -> ec00c530c
Optimize WindowedValueCoder
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c90fb80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c90fb80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c90fb80
Branch: refs/heads/python-sdk
Commit: 0c90fb80f3961848aa82667e8891cfebf4dbc351
Parents: ea64242
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Nov 1 16:12:19 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Nov 9 13:26:44 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.pxd | 4 ++++
sdks/python/apache_beam/coders/coder_impl.py | 21 ++++++++++++++------
.../apache_beam/coders/coders_test_common.py | 8 ++------
3 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index e021c2e..74444ff 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -26,6 +26,7 @@ cimport libc.stdlib
cimport libc.string
from .stream cimport InputStream, OutputStream
+from apache_beam.utils cimport windowed_value
cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size
@@ -137,3 +138,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
@cython.locals(c=CoderImpl)
cpdef get_estimated_size_and_observables(self, value, bint nested=?)
+
+ @cython.locals(wv=windowed_value.WindowedValue)
+ cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index d075814..47a837f 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -29,7 +29,7 @@ from types import NoneType
from apache_beam.coders import observable
from apache_beam.utils.timestamp import Timestamp
-from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import windowed_value
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
@@ -535,19 +535,28 @@ class WindowedValueCoderImpl(StreamCoderImpl):
"""A coder for windowed values."""
def __init__(self, value_coder, timestamp_coder, window_coder):
+ # TODO(robertwb): Do we need the ability to customize timestamp_coder?
self._value_coder = value_coder
self._timestamp_coder = timestamp_coder
self._windows_coder = TupleSequenceCoderImpl(window_coder)
def encode_to_stream(self, value, out, nested):
- self._value_coder.encode_to_stream(value.value, out, True)
- self._timestamp_coder.encode_to_stream(value.timestamp, out, True)
- self._windows_coder.encode_to_stream(value.windows, out, True)
+ wv = value # type cast
+ self._value_coder.encode_to_stream(wv.value, out, True)
+ if isinstance(self._timestamp_coder, TimestampCoderImpl):
+ # Avoid creation of Timestamp object.
+ out.write_bigendian_int64(wv.timestamp_micros)
+ else:
+ self._timestamp_coder.encode_to_stream(wv.timestamp, out, True)
+ self._windows_coder.encode_to_stream(wv.windows, out, True)
def decode_from_stream(self, in_stream, nested):
- return WindowedValue(
+ return windowed_value.create(
self._value_coder.decode_from_stream(in_stream, True),
- self._timestamp_coder.decode_from_stream(in_stream, True),
+ # Avoid creation of Timestamp object.
+ in_stream.read_bigendian_int64()
+ if isinstance(self._timestamp_coder, TimestampCoderImpl)
+ else self._timestamp_coder.decode_from_stream(in_stream, True).micros,
self._windows_coder.decode_from_stream(in_stream, True))
def get_estimated_size_and_observables(self, value, nested=False):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 1af8347..adeb6a5 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -26,6 +26,7 @@ import dill
import coders
import observable
from apache_beam.utils import timestamp
+from apache_beam.utils import windowed_value
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
@@ -237,12 +238,7 @@ class CodersTest(unittest.TestCase):
# Test nested WindowedValue observable.
coder = coders.WindowedValueCoder(iter_coder)
observ = FakeObservableIterator()
- try:
- value = coders.coder_impl.WindowedValue(observ)
- except TypeError:
- # We are running tests with a fake WindowedValue implementation so as to
- # not pull in the rest of the SDK.
- value = coders.coder_impl.WindowedValue(observ, 0, [])
+ value = windowed_value.WindowedValue(observ, 0, ())
self.assertEqual(
coder.get_impl().get_estimated_size_and_observables(value)[1],
[(observ, elem_coder.get_impl())])
[2/2] incubator-beam git commit: Closes #1253
Posted by ro...@apache.org.
Closes #1253
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec00c530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec00c530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec00c530
Branch: refs/heads/python-sdk
Commit: ec00c530c9a54ce61095214fcce7b69a7c653d95
Parents: ea64242 0c90fb8
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Nov 9 13:26:45 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Nov 9 13:26:45 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.pxd | 4 ++++
sdks/python/apache_beam/coders/coder_impl.py | 21 ++++++++++++++------
.../apache_beam/coders/coders_test_common.py | 8 ++------
3 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------