You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/12/06 17:08:31 UTC

[beam] branch master updated: [BEAM-6067] In Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard CloudObject coders (#7081)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0edc85e  [BEAM-6067] In Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard CloudObject coders (#7081)
0edc85e is described below

commit 0edc85ed763fe1fc1ca492964734b4bf76417f3d
Author: CraigChambersG <45...@users.noreply.github.com>
AuthorDate: Thu Dec 6 09:08:23 2018 -0800

    [BEAM-6067] In Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard CloudObject coders (#7081)
---
 sdks/python/apache_beam/coders/coders.py           | 89 +++++++++++++---------
 .../runners/dataflow/dataflow_runner.py            | 31 +++++---
 2 files changed, 73 insertions(+), 47 deletions(-)

diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index f5c90a8..f2a4b2e 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -190,7 +190,7 @@ class Coder(object):
     # refined in user-defined Coders.
     return []
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     """For internal use only; no backwards-compatibility guarantees.
 
     Returns Google Cloud Dataflow API description of this coder."""
@@ -201,12 +201,17 @@ class Coder(object):
         # We pass coders in the form "<coder_name>$<pickled_data>" to make the
         # job description JSON more readable.  Data before the $ is ignored by
         # the worker.
-        '@type': serialize_coder(self),
-        'component_encodings': list(
-            component.as_cloud_object()
+        '@type':
+            serialize_coder(self),
+        'component_encodings': [
+            component.as_cloud_object(coders_context)
             for component in self._get_component_coders()
-        ),
+        ],
     }
+
+    if coders_context:
+      value['pipeline_proto_coder_id'] = coders_context.get_id(self)
+
     return value
 
   def __repr__(self):
@@ -370,7 +375,7 @@ class BytesCoder(FastCoder):
   def is_deterministic(self):
     return True
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:bytes',
     }
@@ -394,7 +399,7 @@ class VarIntCoder(FastCoder):
   def is_deterministic(self):
     return True
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:varint',
     }
@@ -516,8 +521,8 @@ class _PickleCoderBase(FastCoder):
     # GroupByKey operations.
     return False
 
-  def as_cloud_object(self, is_pair_like=True):
-    value = super(_PickleCoderBase, self).as_cloud_object()
+  def as_cloud_object(self, coders_context=None, is_pair_like=True):
+    value = super(_PickleCoderBase, self).as_cloud_object(coders_context)
     # We currently use this coder in places where we cannot infer the coder to
     # use for the value type in a more granular way.  In places where the
     # service expects a pair, it checks for the "is_pair_like" key, in which
@@ -525,8 +530,8 @@ class _PickleCoderBase(FastCoder):
     if is_pair_like:
       value['is_pair_like'] = True
       value['component_encodings'] = [
-          self.as_cloud_object(is_pair_like=False),
-          self.as_cloud_object(is_pair_like=False)
+          self.as_cloud_object(coders_context, is_pair_like=False),
+          self.as_cloud_object(coders_context, is_pair_like=False)
       ]
 
     return value
@@ -615,8 +620,8 @@ class FastPrimitivesCoder(FastCoder):
     else:
       return DeterministicFastPrimitivesCoder(self, step_label)
 
-  def as_cloud_object(self, is_pair_like=True):
-    value = super(FastCoder, self).as_cloud_object()
+  def as_cloud_object(self, coders_context=None, is_pair_like=True):
+    value = super(FastCoder, self).as_cloud_object(coders_context)
     # We currently use this coder in places where we cannot infer the coder to
     # use for the value type in a more granular way.  In places where the
     # service expects a pair, it checks for the "is_pair_like" key, in which
@@ -624,8 +629,8 @@ class FastPrimitivesCoder(FastCoder):
     if is_pair_like:
       value['is_pair_like'] = True
       value['component_encodings'] = [
-          self.as_cloud_object(is_pair_like=False),
-          self.as_cloud_object(is_pair_like=False)
+          self.as_cloud_object(coders_context, is_pair_like=False),
+          self.as_cloud_object(coders_context, is_pair_like=False)
       ]
 
     return value
@@ -744,18 +749,20 @@ class TupleCoder(FastCoder):
   def from_type_hint(typehint, registry):
     return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     if self.is_kv_coder():
       return {
-          '@type': 'kind:pair',
-          'is_pair_like': True,
-          'component_encodings': list(
-              component.as_cloud_object()
+          '@type':
+              'kind:pair',
+          'is_pair_like':
+              True,
+          'component_encodings': [
+              component.as_cloud_object(coders_context)
               for component in self._get_component_coders()
-          ),
+          ],
       }
 
-    return super(TupleCoder, self).as_cloud_object()
+    return super(TupleCoder, self).as_cloud_object(coders_context)
 
   def _get_component_coders(self):
     return self.coders()
@@ -853,11 +860,15 @@ class IterableCoder(FastCoder):
       return IterableCoder(
           self._elem_coder.as_deterministic_coder(step_label, error_message))
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
-        '@type': 'kind:stream',
-        'is_stream_like': True,
-        'component_encodings': [self._elem_coder.as_cloud_object()],
+        '@type':
+            'kind:stream',
+        'is_stream_like':
+            True,
+        'component_encodings': [
+            self._elem_coder.as_cloud_object(coders_context)
+        ],
     }
 
   def value_coder(self):
@@ -891,7 +902,7 @@ class GlobalWindowCoder(SingletonCoder):
     from apache_beam.transforms import window
     super(GlobalWindowCoder, self).__init__(window.GlobalWindow())
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:global_window',
     }
@@ -910,7 +921,7 @@ class IntervalWindowCoder(FastCoder):
   def is_deterministic(self):
     return True
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:interval_window',
     }
@@ -947,13 +958,16 @@ class WindowedValueCoder(FastCoder):
                                               self.timestamp_coder,
                                               self.window_coder])
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
-        '@type': 'kind:windowed_value',
-        'is_wrapper': True,
+        '@type':
+            'kind:windowed_value',
+        'is_wrapper':
+            True,
         'component_encodings': [
-            component.as_cloud_object()
-            for component in self._get_component_coders()],
+            component.as_cloud_object(coders_context)
+            for component in self._get_component_coders()
+        ],
     }
 
   def _get_component_coders(self):
@@ -1007,10 +1021,13 @@ class LengthPrefixCoder(FastCoder):
   def value_coder(self):
     return self._value_coder
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
-        '@type': 'kind:length_prefix',
-        'component_encodings': [self._value_coder.as_cloud_object()],
+        '@type':
+            'kind:length_prefix',
+        'component_encodings': [
+            self._value_coder.as_cloud_object(coders_context)
+        ],
     }
 
   def _get_component_coders(self):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 88b03d4..f53808d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -398,10 +398,10 @@ class DataflowRunner(PipelineRunner):
     result.metric_results = self._metrics
     return result
 
-  def _get_typehint_based_encoding(self, typehint, window_coder):
+  def _get_typehint_based_encoding(self, typehint, window_coder, use_fnapi):
     """Returns an encoding based on a typehint object."""
-    return self._get_cloud_encoding(self._get_coder(typehint,
-                                                    window_coder=window_coder))
+    return self._get_cloud_encoding(
+        self._get_coder(typehint, window_coder=window_coder), use_fnapi)
 
   @staticmethod
   def _get_coder(typehint, window_coder):
@@ -412,12 +412,13 @@ class DataflowRunner(PipelineRunner):
           window_coder=window_coder)
     return coders.registry.get_coder(typehint)
 
-  def _get_cloud_encoding(self, coder):
+  def _get_cloud_encoding(self, coder, use_fnapi):
     """Returns an encoding based on a coder object."""
     if not isinstance(coder, coders.Coder):
       raise TypeError('Coder object must inherit from coders.Coder: %s.' %
                       str(coder))
-    return coder.as_cloud_object()
+    return coder.as_cloud_object(self.proto_context
+                                 .coders if use_fnapi else None)
 
   def _get_side_input_encoding(self, input_encoding):
     """Returns an encoding for the output of a view transform.
@@ -454,8 +455,11 @@ class DataflowRunner(PipelineRunner):
           transform_node.outputs[None].windowing.windowfn.get_window_coder())
     else:
       window_coder = None
-    return self._get_typehint_based_encoding(
-        element_type, window_coder=window_coder)
+    from apache_beam.runners.dataflow.internal import apiclient
+    use_fnapi = apiclient._use_fnapi(
+        transform_node.outputs.values()[0].pipeline._options)
+    return self._get_typehint_based_encoding(element_type, window_coder,
+                                             use_fnapi)
 
   def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
     """Creates a Step object and adds it to the cache."""
@@ -753,7 +757,8 @@ class DataflowRunner(PipelineRunner):
     # The data transmitted in SERIALIZED_FN is different depending on whether
     # this is a fnapi pipeline or not.
     from apache_beam.runners.dataflow.internal import apiclient
-    if apiclient._use_fnapi(options):
+    use_fnapi = apiclient._use_fnapi(options)
+    if use_fnapi:
       # Fnapi pipelines send the transform ID of the CombineValues transform's
       # parent composite because Dataflow expects the ID of a CombinePerKey
       # transform.
@@ -775,7 +780,7 @@ class DataflowRunner(PipelineRunner):
     # Note that the accumulator must not have a WindowedValue encoding, while
     # the output of this step does in fact have a WindowedValue encoding.
     accumulator_encoding = self._get_cloud_encoding(
-        transform_node.transform.fn.get_accumulator_coder())
+        transform_node.transform.fn.get_accumulator_coder(), use_fnapi)
     output_encoding = self._get_encoded_output_coder(transform_node)
 
     step.encoding = output_encoding
@@ -911,7 +916,9 @@ class DataflowRunner(PipelineRunner):
         coders.registry.get_coder(transform_node.outputs[None].element_type),
         coders.coders.GlobalWindowCoder())
 
-    step.encoding = self._get_cloud_encoding(coder)
+    from apache_beam.runners.dataflow.internal import apiclient
+    use_fnapi = apiclient._use_fnapi(options)
+    step.encoding = self._get_cloud_encoding(coder, use_fnapi)
     step.add_property(
         PropertyNames.OUTPUT_INFO,
         [{PropertyNames.USER_NAME: (
@@ -993,7 +1000,9 @@ class DataflowRunner(PipelineRunner):
     # correct coder.
     coder = coders.WindowedValueCoder(transform.sink.coder,
                                       coders.coders.GlobalWindowCoder())
-    step.encoding = self._get_cloud_encoding(coder)
+    from apache_beam.runners.dataflow.internal import apiclient
+    use_fnapi = apiclient._use_fnapi(options)
+    step.encoding = self._get_cloud_encoding(coder, use_fnapi)
     step.add_property(PropertyNames.ENCODING, step.encoding)
     step.add_property(
         PropertyNames.PARALLEL_INPUT,