You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/28 22:28:17 UTC
[1/2] beam git commit: [BEAM-1226] Add support for well known coder
types to Apache Beam python SDK
Repository: beam
Updated Branches:
refs/heads/python-sdk eff50118b -> 2999e2290
[BEAM-1226] Add support for well known coder types to Apache Beam python SDK
This uses specific cloud object representations for the following types:
kind:pair (TupleCoder with two components, previously pickled)
kind:stream (IterableCoder with a single component, previously ignored)
kind:global_window (GlobalWindowCoder, previously SingletonCoder)
kind:length_prefix (A new type of coder which always encodes the length of the value type as a prefix, has a single component)
kind:windowed_value (A wrapper coder with two components (value coder and window coder))
This also drops the ability to configure the timestamp coder on WindowedValueCoder.
These changes are towards having a common binary representation for certain well known coders across multiple languages.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6272e296
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6272e296
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6272e296
Branch: refs/heads/python-sdk
Commit: 6272e296c946311e28f3edc848eaa6caf794ef9d
Parents: eff5011
Author: Luke Cwik <lc...@google.com>
Authored: Wed Dec 28 13:54:58 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Dec 28 13:54:58 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 35 +++++--
sdks/python/apache_beam/coders/coders.py | 99 +++++++++++++++++---
.../apache_beam/coders/coders_test_common.py | 87 ++++++++++++++++-
.../apache_beam/runners/dataflow_runner.py | 3 +-
sdks/python/apache_beam/transforms/window.py | 4 +-
.../apache_beam/transforms/window_test.py | 9 ++
6 files changed, 208 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 47a837f..fcdc441 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -60,7 +60,7 @@ class CoderImpl(object):
raise NotImplementedError
def decode(self, encoded):
- """Encodes an object to an unnested string."""
+ """Decodes an object to an unnested string."""
raise NotImplementedError
def estimate_size(self, value, nested=False):
@@ -535,7 +535,7 @@ class WindowedValueCoderImpl(StreamCoderImpl):
"""A coder for windowed values."""
def __init__(self, value_coder, timestamp_coder, window_coder):
- # TODO(robertwb): Do we need the ability to customize timestamp_coder?
+ # TODO(lcwik): Remove the timestamp coder field
self._value_coder = value_coder
self._timestamp_coder = timestamp_coder
self._windows_coder = TupleSequenceCoderImpl(window_coder)
@@ -543,20 +543,15 @@ class WindowedValueCoderImpl(StreamCoderImpl):
def encode_to_stream(self, value, out, nested):
wv = value # type cast
self._value_coder.encode_to_stream(wv.value, out, True)
- if isinstance(self._timestamp_coder, TimestampCoderImpl):
- # Avoid creation of Timestamp object.
- out.write_bigendian_int64(wv.timestamp_micros)
- else:
- self._timestamp_coder.encode_to_stream(wv.timestamp, out, True)
+ # Avoid creation of Timestamp object.
+ out.write_bigendian_int64(wv.timestamp_micros)
self._windows_coder.encode_to_stream(wv.windows, out, True)
def decode_from_stream(self, in_stream, nested):
return windowed_value.create(
self._value_coder.decode_from_stream(in_stream, True),
# Avoid creation of Timestamp object.
- in_stream.read_bigendian_int64()
- if isinstance(self._timestamp_coder, TimestampCoderImpl)
- else self._timestamp_coder.decode_from_stream(in_stream, True).micros,
+ in_stream.read_bigendian_int64(),
self._windows_coder.decode_from_stream(in_stream, True))
def get_estimated_size_and_observables(self, value, nested=False):
@@ -577,3 +572,23 @@ class WindowedValueCoderImpl(StreamCoderImpl):
estimated_size += (
self._windows_coder.estimate_size(value.windows, nested=True))
return estimated_size, observables
+
+
+class LengthPrefixCoderImpl(StreamCoderImpl):
+ """Coder which prefixes the length of the encoded object in the stream."""
+
+ def __init__(self, value_coder):
+ self._value_coder = value_coder
+
+ def encode_to_stream(self, value, out, nested):
+ encoded_value = self._value_coder.encode(value)
+ out.write_var_int64(len(encoded_value))
+ out.write(encoded_value)
+
+ def decode_from_stream(self, in_stream, nested):
+ value_length = in_stream.read_var_int64()
+ return self._value_coder.decode(in_stream.read(value_length))
+
+ def estimate_size(self, value, nested=False):
+ value_size = self._value_coder.estimate_size(value)
+ return get_varint_size(value_size) + value_size
http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 1e78b1d..67bbbe6 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -23,6 +23,13 @@ import google.protobuf
from apache_beam.coders import coder_impl
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+ from stream import get_varint_size
+except ImportError:
+ from slow_stream import get_varint_size
+# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
+
# pylint: disable=wrong-import-order, wrong-import-position
# Avoid dependencies on the full SDK.
@@ -161,7 +168,8 @@ class Coder(object):
'@type': serialize_coder(self),
'component_encodings': list(
component.as_cloud_object()
- for component in self._get_component_coders())
+ for component in self._get_component_coders()
+ ),
}
return value
@@ -501,9 +509,17 @@ class TupleCoder(FastCoder):
return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
def as_cloud_object(self):
- value = super(TupleCoder, self).as_cloud_object()
- value['is_pair_like'] = True
- return value
+ if self.is_kv_coder():
+ return {
+ '@type': 'kind:pair',
+ 'is_pair_like': True,
+ 'component_encodings': list(
+ component.as_cloud_object()
+ for component in self._get_component_coders()
+ ),
+ }
+
+ return super(TupleCoder, self).as_cloud_object()
def _get_component_coders(self):
return self.coders()
@@ -563,6 +579,16 @@ class IterableCoder(FastCoder):
def is_deterministic(self):
return self._elem_coder.is_deterministic()
+ def as_cloud_object(self):
+ return {
+ '@type': 'kind:stream',
+ 'is_stream_like': True,
+ 'component_encodings': [self._elem_coder.as_cloud_object()],
+ }
+
+ def value_coder(self):
+ return self._elem_coder
+
@staticmethod
def from_type_hint(typehint, registry):
return IterableCoder(registry.get_coder(typehint.inner_type))
@@ -590,17 +616,27 @@ class WindowCoder(PickleCoder):
return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
+class GlobalWindowCoder(SingletonCoder):
+ """Coder for global windows."""
+
+ def __init__(self):
+ from apache_beam.transforms import window
+ super(GlobalWindowCoder, self).__init__(window.GlobalWindow())
+
+ def as_cloud_object(self):
+ return {
+ '@type': 'kind:global_window',
+ }
+
+
class WindowedValueCoder(FastCoder):
"""Coder for windowed values."""
- def __init__(self, wrapped_value_coder, timestamp_coder=None,
- window_coder=None):
- if not timestamp_coder:
- timestamp_coder = TimestampCoder()
+ def __init__(self, wrapped_value_coder, window_coder=None):
if not window_coder:
window_coder = PickleCoder()
self.wrapped_value_coder = wrapped_value_coder
- self.timestamp_coder = timestamp_coder
+ self.timestamp_coder = TimestampCoder()
self.window_coder = window_coder
def _create_impl(self):
@@ -615,12 +651,16 @@ class WindowedValueCoder(FastCoder):
self.window_coder])
def as_cloud_object(self):
- value = super(WindowedValueCoder, self).as_cloud_object()
- value['is_wrapper'] = True
- return value
+ return {
+ '@type': 'kind:windowed_value',
+ 'is_wrapper': True,
+ 'component_encodings': [
+ component.as_cloud_object()
+ for component in self._get_component_coders()],
+ }
def _get_component_coders(self):
- return [self.wrapped_value_coder, self.timestamp_coder, self.window_coder]
+ return [self.wrapped_value_coder, self.window_coder]
def is_kv_coder(self):
return self.wrapped_value_coder.is_kv_coder()
@@ -633,3 +673,36 @@ class WindowedValueCoder(FastCoder):
def __repr__(self):
return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
+
+
+class LengthPrefixCoder(FastCoder):
+ """Coder which prefixes the length of the encoded object in the stream."""
+
+ def __init__(self, value_coder):
+ self._value_coder = value_coder
+
+ def _create_impl(self):
+ return coder_impl.LengthPrefixCoderImpl(self._value_coder)
+
+ def is_deterministic(self):
+ return self._value_coder.is_deterministic()
+
+ def estimate_size(self, value):
+ value_size = self._value_coder.estimate_size(value)
+ return get_varint_size(value_size) + value_size
+
+ def value_coder(self):
+ return self._value_coder
+
+ def as_cloud_object(self):
+ return {
+ '@type': 'kind:length_prefix',
+ 'component_encodings': [self._value_coder.as_cloud_object()],
+ }
+
+ def _get_component_coders(self):
+ return (self._value_coder,)
+
+ def __repr__(self):
+ return 'LengthPrefixCoder[%r]' % self._value_coder
+
http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index bfd4d77..b2bcb96 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -25,6 +25,7 @@ import dill
import coders
import observable
+from apache_beam.transforms import window
from apache_beam.utils import timestamp
from apache_beam.utils import windowed_value
@@ -179,11 +180,28 @@ class CodersTest(unittest.TestCase):
(timestamp.Timestamp.of(27), 'abc'))
def test_tuple_coder(self):
+ kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
+ # Verify cloud object representation
+ self.assertEqual(
+ {
+ '@type': 'kind:pair',
+ 'is_pair_like': True,
+ 'component_encodings': [
+ coders.VarIntCoder().as_cloud_object(),
+ coders.BytesCoder().as_cloud_object()],
+ },
+ kv_coder.as_cloud_object())
+ # Test binary representation
+ self.assertEqual(
+ '\x04\x03abc',
+ kv_coder.encode((4, 'abc')))
+ # Test unnested
self.check_coder(
- coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())),
+ kv_coder,
(1, 'a'),
(-2, 'a' * 100),
(300, 'abc\0' * 5))
+ # Test nested
self.check_coder(
coders.TupleCoder(
(coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())),
@@ -206,18 +224,47 @@ class CodersTest(unittest.TestCase):
self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
def test_iterable_coder(self):
- self.check_coder(coders.IterableCoder(coders.VarIntCoder()),
+ iterable_coder = coders.IterableCoder(coders.VarIntCoder())
+ # Verify cloud object representation
+ self.assertEqual(
+ {
+ '@type': 'kind:stream',
+ 'is_stream_like': True,
+ 'component_encodings': [coders.VarIntCoder().as_cloud_object()]
+ },
+ iterable_coder.as_cloud_object())
+ # Test unnested
+ self.check_coder(iterable_coder,
[1], [-1, 0, 100])
+ # Test nested
self.check_coder(
coders.TupleCoder((coders.VarIntCoder(),
coders.IterableCoder(coders.VarIntCoder()))),
(1, [1, 2, 3]))
def test_windowed_value_coder(self):
+ coder = coders.WindowedValueCoder(coders.VarIntCoder(),
+ coders.GlobalWindowCoder())
+ # Verify cloud object representation
+ self.assertEqual(
+ {
+ '@type': 'kind:windowed_value',
+ 'is_wrapper': True,
+ 'component_encodings': [
+ coders.VarIntCoder().as_cloud_object(),
+ coders.GlobalWindowCoder().as_cloud_object(),
+ ],
+ },
+ coder.as_cloud_object())
+ # Test binary representation
+ self.assertEqual('\x01\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
+ coder.encode(window.GlobalWindows.windowed_value(1)))
+ # Test unnested
self.check_coder(
coders.WindowedValueCoder(coders.VarIntCoder()),
windowed_value.WindowedValue(3, -100, ()),
windowed_value.WindowedValue(-1, 100, (1, 2, 3)))
+ # Test nested
self.check_coder(
coders.TupleCoder((
coders.WindowedValueCoder(coders.FloatCoder()),
@@ -241,6 +288,42 @@ class CodersTest(unittest.TestCase):
self.check_coder(coders.TupleCoder((proto_coder, coders.BytesCoder())),
(ma, 'a'), (mb, 'b'))
+ def test_global_window_coder(self):
+ coder = coders.GlobalWindowCoder()
+ value = window.GlobalWindow()
+ # Verify cloud object representation
+ self.assertEqual({'@type': 'kind:global_window'},
+ coder.as_cloud_object())
+ # Test binary representation
+ self.assertEqual('', coder.encode(value))
+ self.assertEqual(value, coder.decode(''))
+ # Test unnested
+ self.check_coder(coder, value)
+ # Test nested
+ self.check_coder(coders.TupleCoder((coder, coder)),
+ (value, value))
+
+ def test_length_prefix_coder(self):
+ coder = coders.LengthPrefixCoder(coders.BytesCoder())
+ # Verify cloud object representation
+ self.assertEqual(
+ {
+ '@type': 'kind:length_prefix',
+ 'component_encodings': [coders.BytesCoder().as_cloud_object()]
+ },
+ coder.as_cloud_object())
+ # Test binary representation
+ self.assertEqual('\x00', coder.encode(''))
+ self.assertEqual('\x01a', coder.encode('a'))
+ self.assertEqual('\x02bc', coder.encode('bc'))
+ self.assertEqual('\xff\x7f' + 'z' * 16383, coder.encode('z' * 16383))
+ # Test unnested
+ self.check_coder(coder, '', 'a', 'bc', 'def')
+ # Test nested
+ self.check_coder(coders.TupleCoder((coder, coder)),
+ ('', 'a'),
+ ('bc', 'def'))
+
def test_nested_observables(self):
class FakeObservableIterator(observable.ObservableMixin):
http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 392a166..3ee95c5 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -206,8 +206,7 @@ class DataflowRunner(PipelineRunner):
if window_coder:
return coders.WindowedValueCoder(
coders.registry.get_coder(typehint),
- coders.TimestampCoder(),
- window_coder)
+ window_coder=window_coder)
else:
return coders.registry.get_coder(typehint)
http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 9485032..70759e0 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -111,7 +111,7 @@ class WindowFn(object):
raise NotImplementedError
def get_window_coder(self):
- return coders.PickleCoder()
+ return coders.WindowCoder()
def get_transformed_output_time(self, window, input_timestamp): # pylint: disable=unused-argument
"""Given input time and output window, returns output time for window.
@@ -240,7 +240,7 @@ class GlobalWindows(WindowFn):
pass # No merging.
def get_window_coder(self):
- return coders.SingletonCoder(GlobalWindow())
+ return coders.GlobalWindowCoder()
def __hash__(self):
return hash(type(self))
http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 6c3c98e..856d011 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -28,6 +28,8 @@ from apache_beam.transforms import GroupByKey
from apache_beam.transforms import Map
from apache_beam.transforms import window
from apache_beam.transforms import WindowInto
+from apache_beam.transforms.timeutil import MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import MIN_TIMESTAMP
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import IntervalWindow
@@ -55,6 +57,13 @@ reify_windows = core.ParDo(ReifyWindowsFn())
class WindowTest(unittest.TestCase):
+ def test_global_window(self):
+ self.assertEqual(window.GlobalWindow(), window.GlobalWindow())
+ self.assertNotEqual(window.GlobalWindow(),
+ window.IntervalWindow(MIN_TIMESTAMP, MAX_TIMESTAMP))
+ self.assertNotEqual(window.IntervalWindow(MIN_TIMESTAMP, MAX_TIMESTAMP),
+ window.GlobalWindow())
+
def test_fixed_windows(self):
# Test windows with offset: 2, 7, 12, 17, ...
windowfn = window.FixedWindows(size=5, offset=2)
[2/2] beam git commit: [BEAM-1226] Add support for well known coder
types to Apache Beam Python SDK
Posted by lc...@apache.org.
[BEAM-1226] Add support for well known coder types to Apache Beam Python SDK
This closes #1709
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2999e229
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2999e229
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2999e229
Branch: refs/heads/python-sdk
Commit: 2999e2290cb3157f26a695304dcb8e6ddf4968c3
Parents: eff5011 6272e29
Author: Luke Cwik <lc...@google.com>
Authored: Wed Dec 28 14:28:02 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Dec 28 14:28:02 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 35 +++++--
sdks/python/apache_beam/coders/coders.py | 99 +++++++++++++++++---
.../apache_beam/coders/coders_test_common.py | 87 ++++++++++++++++-
.../apache_beam/runners/dataflow_runner.py | 3 +-
sdks/python/apache_beam/transforms/window.py | 4 +-
.../apache_beam/transforms/window_test.py | 9 ++
6 files changed, 208 insertions(+), 29 deletions(-)
----------------------------------------------------------------------