You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/12/06 17:08:31 UTC
[beam] branch master updated: [BEAM-6067] In Python SDK,
specify pipeline_proto_coder_id property in non-Beam-standard
CloudObject coders (#7081)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0edc85e [BEAM-6067] In Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard CloudObject coders (#7081)
0edc85e is described below
commit 0edc85ed763fe1fc1ca492964734b4bf76417f3d
Author: CraigChambersG <45...@users.noreply.github.com>
AuthorDate: Thu Dec 6 09:08:23 2018 -0800
[BEAM-6067] In Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard CloudObject coders (#7081)
---
sdks/python/apache_beam/coders/coders.py | 89 +++++++++++++---------
.../runners/dataflow/dataflow_runner.py | 31 +++++---
2 files changed, 73 insertions(+), 47 deletions(-)
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index f5c90a8..f2a4b2e 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -190,7 +190,7 @@ class Coder(object):
# refined in user-defined Coders.
return []
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
"""For internal use only; no backwards-compatibility guarantees.
Returns Google Cloud Dataflow API description of this coder."""
@@ -201,12 +201,17 @@ class Coder(object):
# We pass coders in the form "<coder_name>$<pickled_data>" to make the
# job description JSON more readable. Data before the $ is ignored by
# the worker.
- '@type': serialize_coder(self),
- 'component_encodings': list(
- component.as_cloud_object()
+ '@type':
+ serialize_coder(self),
+ 'component_encodings': [
+ component.as_cloud_object(coders_context)
for component in self._get_component_coders()
- ),
+ ],
}
+
+ if coders_context:
+ value['pipeline_proto_coder_id'] = coders_context.get_id(self)
+
return value
def __repr__(self):
@@ -370,7 +375,7 @@ class BytesCoder(FastCoder):
def is_deterministic(self):
return True
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:bytes',
}
@@ -394,7 +399,7 @@ class VarIntCoder(FastCoder):
def is_deterministic(self):
return True
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:varint',
}
@@ -516,8 +521,8 @@ class _PickleCoderBase(FastCoder):
# GroupByKey operations.
return False
- def as_cloud_object(self, is_pair_like=True):
- value = super(_PickleCoderBase, self).as_cloud_object()
+ def as_cloud_object(self, coders_context=None, is_pair_like=True):
+ value = super(_PickleCoderBase, self).as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
@@ -525,8 +530,8 @@ class _PickleCoderBase(FastCoder):
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
- self.as_cloud_object(is_pair_like=False),
- self.as_cloud_object(is_pair_like=False)
+ self.as_cloud_object(coders_context, is_pair_like=False),
+ self.as_cloud_object(coders_context, is_pair_like=False)
]
return value
@@ -615,8 +620,8 @@ class FastPrimitivesCoder(FastCoder):
else:
return DeterministicFastPrimitivesCoder(self, step_label)
- def as_cloud_object(self, is_pair_like=True):
- value = super(FastCoder, self).as_cloud_object()
+ def as_cloud_object(self, coders_context=None, is_pair_like=True):
+ value = super(FastCoder, self).as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
@@ -624,8 +629,8 @@ class FastPrimitivesCoder(FastCoder):
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
- self.as_cloud_object(is_pair_like=False),
- self.as_cloud_object(is_pair_like=False)
+ self.as_cloud_object(coders_context, is_pair_like=False),
+ self.as_cloud_object(coders_context, is_pair_like=False)
]
return value
@@ -744,18 +749,20 @@ class TupleCoder(FastCoder):
def from_type_hint(typehint, registry):
return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
if self.is_kv_coder():
return {
- '@type': 'kind:pair',
- 'is_pair_like': True,
- 'component_encodings': list(
- component.as_cloud_object()
+ '@type':
+ 'kind:pair',
+ 'is_pair_like':
+ True,
+ 'component_encodings': [
+ component.as_cloud_object(coders_context)
for component in self._get_component_coders()
- ),
+ ],
}
- return super(TupleCoder, self).as_cloud_object()
+ return super(TupleCoder, self).as_cloud_object(coders_context)
def _get_component_coders(self):
return self.coders()
@@ -853,11 +860,15 @@ class IterableCoder(FastCoder):
return IterableCoder(
self._elem_coder.as_deterministic_coder(step_label, error_message))
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
return {
- '@type': 'kind:stream',
- 'is_stream_like': True,
- 'component_encodings': [self._elem_coder.as_cloud_object()],
+ '@type':
+ 'kind:stream',
+ 'is_stream_like':
+ True,
+ 'component_encodings': [
+ self._elem_coder.as_cloud_object(coders_context)
+ ],
}
def value_coder(self):
@@ -891,7 +902,7 @@ class GlobalWindowCoder(SingletonCoder):
from apache_beam.transforms import window
super(GlobalWindowCoder, self).__init__(window.GlobalWindow())
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:global_window',
}
@@ -910,7 +921,7 @@ class IntervalWindowCoder(FastCoder):
def is_deterministic(self):
return True
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:interval_window',
}
@@ -947,13 +958,16 @@ class WindowedValueCoder(FastCoder):
self.timestamp_coder,
self.window_coder])
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
return {
- '@type': 'kind:windowed_value',
- 'is_wrapper': True,
+ '@type':
+ 'kind:windowed_value',
+ 'is_wrapper':
+ True,
'component_encodings': [
- component.as_cloud_object()
- for component in self._get_component_coders()],
+ component.as_cloud_object(coders_context)
+ for component in self._get_component_coders()
+ ],
}
def _get_component_coders(self):
@@ -1007,10 +1021,13 @@ class LengthPrefixCoder(FastCoder):
def value_coder(self):
return self._value_coder
- def as_cloud_object(self):
+ def as_cloud_object(self, coders_context=None):
return {
- '@type': 'kind:length_prefix',
- 'component_encodings': [self._value_coder.as_cloud_object()],
+ '@type':
+ 'kind:length_prefix',
+ 'component_encodings': [
+ self._value_coder.as_cloud_object(coders_context)
+ ],
}
def _get_component_coders(self):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 88b03d4..f53808d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -398,10 +398,10 @@ class DataflowRunner(PipelineRunner):
result.metric_results = self._metrics
return result
- def _get_typehint_based_encoding(self, typehint, window_coder):
+ def _get_typehint_based_encoding(self, typehint, window_coder, use_fnapi):
"""Returns an encoding based on a typehint object."""
- return self._get_cloud_encoding(self._get_coder(typehint,
- window_coder=window_coder))
+ return self._get_cloud_encoding(
+ self._get_coder(typehint, window_coder=window_coder), use_fnapi)
@staticmethod
def _get_coder(typehint, window_coder):
@@ -412,12 +412,13 @@ class DataflowRunner(PipelineRunner):
window_coder=window_coder)
return coders.registry.get_coder(typehint)
- def _get_cloud_encoding(self, coder):
+ def _get_cloud_encoding(self, coder, use_fnapi):
"""Returns an encoding based on a coder object."""
if not isinstance(coder, coders.Coder):
raise TypeError('Coder object must inherit from coders.Coder: %s.' %
str(coder))
- return coder.as_cloud_object()
+ return coder.as_cloud_object(self.proto_context
+ .coders if use_fnapi else None)
def _get_side_input_encoding(self, input_encoding):
"""Returns an encoding for the output of a view transform.
@@ -454,8 +455,11 @@ class DataflowRunner(PipelineRunner):
transform_node.outputs[None].windowing.windowfn.get_window_coder())
else:
window_coder = None
- return self._get_typehint_based_encoding(
- element_type, window_coder=window_coder)
+ from apache_beam.runners.dataflow.internal import apiclient
+ use_fnapi = apiclient._use_fnapi(
+ transform_node.outputs.values()[0].pipeline._options)
+ return self._get_typehint_based_encoding(element_type, window_coder,
+ use_fnapi)
def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
"""Creates a Step object and adds it to the cache."""
@@ -753,7 +757,8 @@ class DataflowRunner(PipelineRunner):
# The data transmitted in SERIALIZED_FN is different depending on whether
# this is a fnapi pipeline or not.
from apache_beam.runners.dataflow.internal import apiclient
- if apiclient._use_fnapi(options):
+ use_fnapi = apiclient._use_fnapi(options)
+ if use_fnapi:
# Fnapi pipelines send the transform ID of the CombineValues transform's
# parent composite because Dataflow expects the ID of a CombinePerKey
# transform.
@@ -775,7 +780,7 @@ class DataflowRunner(PipelineRunner):
# Note that the accumulator must not have a WindowedValue encoding, while
# the output of this step does in fact have a WindowedValue encoding.
accumulator_encoding = self._get_cloud_encoding(
- transform_node.transform.fn.get_accumulator_coder())
+ transform_node.transform.fn.get_accumulator_coder(), use_fnapi)
output_encoding = self._get_encoded_output_coder(transform_node)
step.encoding = output_encoding
@@ -911,7 +916,9 @@ class DataflowRunner(PipelineRunner):
coders.registry.get_coder(transform_node.outputs[None].element_type),
coders.coders.GlobalWindowCoder())
- step.encoding = self._get_cloud_encoding(coder)
+ from apache_beam.runners.dataflow.internal import apiclient
+ use_fnapi = apiclient._use_fnapi(options)
+ step.encoding = self._get_cloud_encoding(coder, use_fnapi)
step.add_property(
PropertyNames.OUTPUT_INFO,
[{PropertyNames.USER_NAME: (
@@ -993,7 +1000,9 @@ class DataflowRunner(PipelineRunner):
# correct coder.
coder = coders.WindowedValueCoder(transform.sink.coder,
coders.coders.GlobalWindowCoder())
- step.encoding = self._get_cloud_encoding(coder)
+ from apache_beam.runners.dataflow.internal import apiclient
+ use_fnapi = apiclient._use_fnapi(options)
+ step.encoding = self._get_cloud_encoding(coder, use_fnapi)
step.add_property(PropertyNames.ENCODING, step.encoding)
step.add_property(
PropertyNames.PARALLEL_INPUT,