You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/31 16:41:25 UTC

[1/3] incubator-beam git commit: Implement size observation for FastPrimitivesCoderImpl.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 03c3c7074 -> 979e299c7


Implement size observation for FastPrimitivesCoderImpl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fba0e91b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fba0e91b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fba0e91b

Branch: refs/heads/python-sdk
Commit: fba0e91b18c542def24e560e44484fc825feeca6
Parents: 4ffdd88
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Oct 28 15:35:15 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 09:07:34 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fba0e91b/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index b31e493..c73dd31 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -214,6 +214,15 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
   def __init__(self, fallback_coder_impl):
     self.fallback_coder_impl = fallback_coder_impl
 
+  def get_estimated_size_and_observables(self, value, nested=False):
+    if isinstance(value, observable.ObservableMixin):
+      # FastPrimitivesCoderImpl can presumably encode the elements too.
+      return 1, [(value, self)]
+    else:
+      out = ByteCountingOutputStream()
+      self.encode_to_stream(value, out, nested)
+      return out.get_count(), []
+
   def encode_to_stream(self, value, stream, nested):
     t = type(value)
     if t is NoneType:


[2/3] incubator-beam git commit: Several fixes to coder size estimates.

Posted by ro...@apache.org.
Several fixes to coder size estimates.

The coder returned in alongside the observable must be the coder
of the observable's elements, not the coder of the observable
itself.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4ffdd889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4ffdd889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4ffdd889

Branch: refs/heads/python-sdk
Commit: 4ffdd8894195e1cd390c651b0f71ed91ccd8b7b7
Parents: 03c3c70
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Oct 28 13:49:50 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 09:07:34 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    | 41 +++++++++-----------
 .../apache_beam/coders/coders_test_common.py    |  7 ++--
 sdks/python/apache_beam/coders/observable.py    |  6 ++-
 3 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ffdd889/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 606d51c..b31e493 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -415,6 +415,7 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
 
   def estimate_size(self, value, nested=False):
     """Estimates the encoded size of the given value, in bytes."""
+    # TODO(ccy): This ignores sizes of observable components.
     estimated_size, _ = (
         self.get_estimated_size_and_observables(value))
     return estimated_size
@@ -425,15 +426,11 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
     estimated_size = 0
     observables = []
     for i in range(0, len(self._coder_impls)):
-      child_value = values[i]
-      if isinstance(child_value, observable.ObservableMixin):
-        observables.append((child_value, self._coder_impls[i]))
-      else:
-        c = self._coder_impls[i]  # type cast
-        child_size, child_observables = (
-            c.get_estimated_size_and_observables(child_value, nested=True))
-        estimated_size += child_size
-        observables += child_observables
+      c = self._coder_impls[i]  # type cast
+      child_size, child_observables = (
+          c.get_estimated_size_and_observables(values[i], nested=True))
+      estimated_size += child_size
+      observables += child_observables
     return estimated_size, observables
 
 
@@ -470,6 +467,7 @@ class SequenceCoderImpl(StreamCoderImpl):
 
   def estimate_size(self, value, nested=False):
     """Estimates the encoded size of the given value, in bytes."""
+    # TODO(ccy): This ignores element sizes.
     estimated_size, _ = (
         self.get_estimated_size_and_observables(value))
     return estimated_size
@@ -477,19 +475,19 @@ class SequenceCoderImpl(StreamCoderImpl):
   def get_estimated_size_and_observables(self, value, nested=False):
     """Returns estimated size of value along with any nested observables."""
     estimated_size = 0
-    observables = []
     # Size of 32-bit integer storing number of elements.
     estimated_size += 4
-    for elem in value:
-      if isinstance(elem, observable.ObservableMixin):
-        observables.append((elem, self._elem_coder))
-      else:
+    if isinstance(value, observable.ObservableMixin):
+      return estimated_size, [(value, self._elem_coder)]
+    else:
+      observables = []
+      for elem in value:
         child_size, child_observables = (
             self._elem_coder.get_estimated_size_and_observables(
                 elem, nested=True))
         estimated_size += child_size
         observables += child_observables
-    return estimated_size, observables
+      return estimated_size, observables
 
 
 class TupleSequenceCoderImpl(SequenceCoderImpl):
@@ -529,14 +527,11 @@ class WindowedValueCoderImpl(StreamCoderImpl):
     """Returns estimated size of value along with any nested observables."""
     estimated_size = 0
     observables = []
-    if isinstance(value.value, observable.ObservableMixin):
-      observables.append((value.value, self._value_coder))
-    else:
-      c = self._value_coder  # type cast
-      value_estimated_size, value_observables = (
-          c.get_estimated_size_and_observables(value.value, nested=True))
-      estimated_size += value_estimated_size
-      observables += value_observables
+    value_estimated_size, value_observables = (
+        self._value_coder.get_estimated_size_and_observables(
+            value.value, nested=True))
+    estimated_size += value_estimated_size
+    observables += value_observables
     estimated_size += (
         self._timestamp_coder.estimate_size(value.timestamp, nested=True))
     estimated_size += (

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ffdd889/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 008fa9f..a906437 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -210,7 +210,8 @@ class CodersTest(unittest.TestCase):
         return iter([1, 2, 3])
 
     # Coder for elements from the observable iterator.
-    iter_coder = coders.VarIntCoder()
+    elem_coder = coders.VarIntCoder()
+    iter_coder = coders.TupleSequenceCoder(elem_coder)
 
     # Test nested WindowedValue observable.
     coder = coders.WindowedValueCoder(iter_coder)
@@ -223,14 +224,14 @@ class CodersTest(unittest.TestCase):
       value = coders.coder_impl.WindowedValue(observ, 0, [])
     self.assertEqual(
         coder.get_impl().get_estimated_size_and_observables(value)[1],
-        [(observ, iter_coder.get_impl())])
+        [(observ, elem_coder.get_impl())])
 
     # Test nested tuple observable.
     coder = coders.TupleCoder((coders.StrUtf8Coder(), iter_coder))
     value = (u'123', observ)
     self.assertEqual(
         coder.get_impl().get_estimated_size_and_observables(value)[1],
-        [(observ, iter_coder.get_impl())])
+        [(observ, elem_coder.get_impl())])
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ffdd889/sdks/python/apache_beam/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py
index 72605bf..f344b5d 100644
--- a/sdks/python/apache_beam/coders/observable.py
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -32,5 +32,7 @@ class ObservableMixin(object):
     self.observers.append(callback)
 
   def notify_observers(self, value, **kwargs):
-    for o in self.observers:
-      o(value, **kwargs)
+    # self.observers is almost always empty
+    if self.observers:
+      for o in self.observers:
+        o(value, **kwargs)


[3/3] incubator-beam git commit: Closes #1224

Posted by ro...@apache.org.
Closes #1224


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/979e299c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/979e299c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/979e299c

Branch: refs/heads/python-sdk
Commit: 979e299c782cfc3f24af8466248f54ebb806f935
Parents: 03c3c70 fba0e91
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Oct 31 09:07:35 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 09:07:35 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    | 50 +++++++++++---------
 .../apache_beam/coders/coders_test_common.py    |  7 +--
 sdks/python/apache_beam/coders/observable.py    |  6 ++-
 3 files changed, 35 insertions(+), 28 deletions(-)
----------------------------------------------------------------------