You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/09 21:32:29 UTC

[1/2] incubator-beam git commit: Optimize WindowedValueCoder

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk ea642428f -> ec00c530c


Optimize WindowedValueCoder


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c90fb80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c90fb80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c90fb80

Branch: refs/heads/python-sdk
Commit: 0c90fb80f3961848aa82667e8891cfebf4dbc351
Parents: ea64242
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Nov 1 16:12:19 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Nov 9 13:26:44 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   |  4 ++++
 sdks/python/apache_beam/coders/coder_impl.py    | 21 ++++++++++++++------
 .../apache_beam/coders/coders_test_common.py    |  8 ++------
 3 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index e021c2e..74444ff 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -26,6 +26,7 @@ cimport libc.stdlib
 cimport libc.string
 
 from .stream cimport InputStream, OutputStream
+from apache_beam.utils cimport windowed_value
 
 
 cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size
@@ -137,3 +138,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
 
   @cython.locals(c=CoderImpl)
   cpdef get_estimated_size_and_observables(self, value, bint nested=?)
+
+  @cython.locals(wv=windowed_value.WindowedValue)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index d075814..47a837f 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -29,7 +29,7 @@ from types import NoneType
 
 from apache_beam.coders import observable
 from apache_beam.utils.timestamp import Timestamp
-from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import windowed_value
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
@@ -535,19 +535,28 @@ class WindowedValueCoderImpl(StreamCoderImpl):
   """A coder for windowed values."""
 
   def __init__(self, value_coder, timestamp_coder, window_coder):
+    # TODO(robertwb): Do we need the ability to customize timestamp_coder?
     self._value_coder = value_coder
     self._timestamp_coder = timestamp_coder
     self._windows_coder = TupleSequenceCoderImpl(window_coder)
 
   def encode_to_stream(self, value, out, nested):
-    self._value_coder.encode_to_stream(value.value, out, True)
-    self._timestamp_coder.encode_to_stream(value.timestamp, out, True)
-    self._windows_coder.encode_to_stream(value.windows, out, True)
+    wv = value  # type cast
+    self._value_coder.encode_to_stream(wv.value, out, True)
+    if isinstance(self._timestamp_coder, TimestampCoderImpl):
+      # Avoid creation of Timestamp object.
+      out.write_bigendian_int64(wv.timestamp_micros)
+    else:
+      self._timestamp_coder.encode_to_stream(wv.timestamp, out, True)
+    self._windows_coder.encode_to_stream(wv.windows, out, True)
 
   def decode_from_stream(self, in_stream, nested):
-    return WindowedValue(
+    return windowed_value.create(
         self._value_coder.decode_from_stream(in_stream, True),
-        self._timestamp_coder.decode_from_stream(in_stream, True),
+        # Avoid creation of Timestamp object.
+        in_stream.read_bigendian_int64()
+        if isinstance(self._timestamp_coder, TimestampCoderImpl)
+        else self._timestamp_coder.decode_from_stream(in_stream, True).micros,
         self._windows_coder.decode_from_stream(in_stream, True))
 
   def get_estimated_size_and_observables(self, value, nested=False):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 1af8347..adeb6a5 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -26,6 +26,7 @@ import dill
 import coders
 import observable
 from apache_beam.utils import timestamp
+from apache_beam.utils import windowed_value
 
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 
@@ -237,12 +238,7 @@ class CodersTest(unittest.TestCase):
     # Test nested WindowedValue observable.
     coder = coders.WindowedValueCoder(iter_coder)
     observ = FakeObservableIterator()
-    try:
-      value = coders.coder_impl.WindowedValue(observ)
-    except TypeError:
-      # We are running tests with a fake WindowedValue implementation so as to
-      # not pull in the rest of the SDK.
-      value = coders.coder_impl.WindowedValue(observ, 0, [])
+    value = windowed_value.WindowedValue(observ, 0, ())
     self.assertEqual(
         coder.get_impl().get_estimated_size_and_observables(value)[1],
         [(observ, elem_coder.get_impl())])


[2/2] incubator-beam git commit: Closes #1253

Posted by ro...@apache.org.
Closes #1253


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec00c530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec00c530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec00c530

Branch: refs/heads/python-sdk
Commit: ec00c530c9a54ce61095214fcce7b69a7c653d95
Parents: ea64242 0c90fb8
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Nov 9 13:26:45 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Nov 9 13:26:45 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   |  4 ++++
 sdks/python/apache_beam/coders/coder_impl.py    | 21 ++++++++++++++------
 .../apache_beam/coders/coders_test_common.py    |  8 ++------
 3 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------