You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/31 16:41:25 UTC
[1/3] incubator-beam git commit: Implement size observation for
FastPrimitivesCoderImpl.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 03c3c7074 -> 979e299c7
Implement size observation for FastPrimitivesCoderImpl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fba0e91b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fba0e91b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fba0e91b
Branch: refs/heads/python-sdk
Commit: fba0e91b18c542def24e560e44484fc825feeca6
Parents: 4ffdd88
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Oct 28 15:35:15 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 09:07:34 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fba0e91b/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index b31e493..c73dd31 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -214,6 +214,15 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
def __init__(self, fallback_coder_impl):
self.fallback_coder_impl = fallback_coder_impl
+ def get_estimated_size_and_observables(self, value, nested=False):
+ if isinstance(value, observable.ObservableMixin):
+ # FastPrimitivesCoderImpl can presumably encode the elements too.
+ return 1, [(value, self)]
+ else:
+ out = ByteCountingOutputStream()
+ self.encode_to_stream(value, out, nested)
+ return out.get_count(), []
+
def encode_to_stream(self, value, stream, nested):
t = type(value)
if t is NoneType:
[2/3] incubator-beam git commit: Several fixes to coder size
estimates.
Posted by ro...@apache.org.
Several fixes to coder size estimates.
The coder returned in alongside the observable must be the coder
of the observable's elements, not the coder of the observable
itself.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4ffdd889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4ffdd889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4ffdd889
Branch: refs/heads/python-sdk
Commit: 4ffdd8894195e1cd390c651b0f71ed91ccd8b7b7
Parents: 03c3c70
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Oct 28 13:49:50 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 09:07:34 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 41 +++++++++-----------
.../apache_beam/coders/coders_test_common.py | 7 ++--
sdks/python/apache_beam/coders/observable.py | 6 ++-
3 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ffdd889/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 606d51c..b31e493 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -415,6 +415,7 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
+ # TODO(ccy): This ignores sizes of observable components.
estimated_size, _ = (
self.get_estimated_size_and_observables(value))
return estimated_size
@@ -425,15 +426,11 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
estimated_size = 0
observables = []
for i in range(0, len(self._coder_impls)):
- child_value = values[i]
- if isinstance(child_value, observable.ObservableMixin):
- observables.append((child_value, self._coder_impls[i]))
- else:
- c = self._coder_impls[i] # type cast
- child_size, child_observables = (
- c.get_estimated_size_and_observables(child_value, nested=True))
- estimated_size += child_size
- observables += child_observables
+ c = self._coder_impls[i] # type cast
+ child_size, child_observables = (
+ c.get_estimated_size_and_observables(values[i], nested=True))
+ estimated_size += child_size
+ observables += child_observables
return estimated_size, observables
@@ -470,6 +467,7 @@ class SequenceCoderImpl(StreamCoderImpl):
def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
+ # TODO(ccy): This ignores element sizes.
estimated_size, _ = (
self.get_estimated_size_and_observables(value))
return estimated_size
@@ -477,19 +475,19 @@ class SequenceCoderImpl(StreamCoderImpl):
def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables."""
estimated_size = 0
- observables = []
# Size of 32-bit integer storing number of elements.
estimated_size += 4
- for elem in value:
- if isinstance(elem, observable.ObservableMixin):
- observables.append((elem, self._elem_coder))
- else:
+ if isinstance(value, observable.ObservableMixin):
+ return estimated_size, [(value, self._elem_coder)]
+ else:
+ observables = []
+ for elem in value:
child_size, child_observables = (
self._elem_coder.get_estimated_size_and_observables(
elem, nested=True))
estimated_size += child_size
observables += child_observables
- return estimated_size, observables
+ return estimated_size, observables
class TupleSequenceCoderImpl(SequenceCoderImpl):
@@ -529,14 +527,11 @@ class WindowedValueCoderImpl(StreamCoderImpl):
"""Returns estimated size of value along with any nested observables."""
estimated_size = 0
observables = []
- if isinstance(value.value, observable.ObservableMixin):
- observables.append((value.value, self._value_coder))
- else:
- c = self._value_coder # type cast
- value_estimated_size, value_observables = (
- c.get_estimated_size_and_observables(value.value, nested=True))
- estimated_size += value_estimated_size
- observables += value_observables
+ value_estimated_size, value_observables = (
+ self._value_coder.get_estimated_size_and_observables(
+ value.value, nested=True))
+ estimated_size += value_estimated_size
+ observables += value_observables
estimated_size += (
self._timestamp_coder.estimate_size(value.timestamp, nested=True))
estimated_size += (
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ffdd889/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 008fa9f..a906437 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -210,7 +210,8 @@ class CodersTest(unittest.TestCase):
return iter([1, 2, 3])
# Coder for elements from the observable iterator.
- iter_coder = coders.VarIntCoder()
+ elem_coder = coders.VarIntCoder()
+ iter_coder = coders.TupleSequenceCoder(elem_coder)
# Test nested WindowedValue observable.
coder = coders.WindowedValueCoder(iter_coder)
@@ -223,14 +224,14 @@ class CodersTest(unittest.TestCase):
value = coders.coder_impl.WindowedValue(observ, 0, [])
self.assertEqual(
coder.get_impl().get_estimated_size_and_observables(value)[1],
- [(observ, iter_coder.get_impl())])
+ [(observ, elem_coder.get_impl())])
# Test nested tuple observable.
coder = coders.TupleCoder((coders.StrUtf8Coder(), iter_coder))
value = (u'123', observ)
self.assertEqual(
coder.get_impl().get_estimated_size_and_observables(value)[1],
- [(observ, iter_coder.get_impl())])
+ [(observ, elem_coder.get_impl())])
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ffdd889/sdks/python/apache_beam/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py
index 72605bf..f344b5d 100644
--- a/sdks/python/apache_beam/coders/observable.py
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -32,5 +32,7 @@ class ObservableMixin(object):
self.observers.append(callback)
def notify_observers(self, value, **kwargs):
- for o in self.observers:
- o(value, **kwargs)
+ # self.observers is almost always empty
+ if self.observers:
+ for o in self.observers:
+ o(value, **kwargs)
[3/3] incubator-beam git commit: Closes #1224
Posted by ro...@apache.org.
Closes #1224
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/979e299c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/979e299c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/979e299c
Branch: refs/heads/python-sdk
Commit: 979e299c782cfc3f24af8466248f54ebb806f935
Parents: 03c3c70 fba0e91
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Oct 31 09:07:35 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 09:07:35 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 50 +++++++++++---------
.../apache_beam/coders/coders_test_common.py | 7 +--
sdks/python/apache_beam/coders/observable.py | 6 ++-
3 files changed, 35 insertions(+), 28 deletions(-)
----------------------------------------------------------------------