You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:22 UTC

[3/5] beam git commit: Translate ParDo operations through the Runner API.

Translate ParDo operations through the Runner API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d64b769
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d64b769
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d64b769

Branch: refs/heads/master
Commit: 7d64b769a7099e767a81c693294b5a66791a441b
Parents: 53c2c8f
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 16:30:56 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py        | 11 +++++++
 sdks/python/apache_beam/transforms/core.py | 38 ++++++++++++++++++++++++-
 sdks/python/apache_beam/utils/urns.py      |  3 ++
 3 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index fe36d85..8553f7c 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -66,6 +66,7 @@ from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import TypeOptions
 from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
 from apache_beam.utils.annotations import deprecated
+from apache_beam.utils import urns
 
 
 __all__ = ['Pipeline']
@@ -474,6 +475,12 @@ class Pipeline(object):
     return str, ('Pickled pipeline stub.',)
 
   def _verify_runner_api_compatible(self):
+    if self._options.view_as(TypeOptions).runtime_type_check:
+      # This option is incompatible with the runner API as it requires
+      # the runner to inspect non-serialized hints on the transform
+      # itself.
+      return False
+
     class Visitor(PipelineVisitor):  # pylint: disable=used-before-assignment
       ok = True  # Really a nonlocal.
 
@@ -745,6 +752,10 @@ class AppliedPTransform(object):
     result.outputs = {
         None if tag == 'None' else tag: context.pcollections.get_by_id(id)
         for tag, id in proto.outputs.items()}
+    # This annotation is expected by some runners.
+    if proto.spec.urn == urns.PARDO_TRANSFORM:
+      result.transform.output_tags = set(proto.outputs.keys()).difference(
+          {'None'})
     if not result.parts:
       for tag, pc in result.outputs.items():
         if pc not in result.inputs:

http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 25fe39f..9168a89 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -23,10 +23,13 @@ import copy
 import inspect
 import types
 
+from google.protobuf import wrappers_pb2
+
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam import coders
 from apache_beam.coders import typecoders
+from apache_beam.internal import pickler
 from apache_beam.internal import util
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.transforms import ptransform
@@ -50,6 +53,7 @@ from apache_beam.typehints.decorators import TypeCheckError
 from apache_beam.typehints.decorators import WithTypeHints
 from apache_beam.typehints.trivial_inference import element_type
 from apache_beam.typehints.typehints import is_consistent_with
+from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
 from apache_beam.options.pipeline_options import TypeOptions
 
@@ -136,7 +140,7 @@ class DoFnProcessContext(DoFnContext):
       self.windows = windowed_value.windows
 
 
-class DoFn(WithTypeHints, HasDisplayData):
+class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   """A function object used by a transform with custom processing.
 
   The ParDo transform is such a transform. The ParDo.apply
@@ -236,6 +240,8 @@ class DoFn(WithTypeHints, HasDisplayData):
       return False # Method is a classmethod
     return True
 
+  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_DO_FN)
+
 
 def _fn_takes_side_inputs(fn):
   try:
@@ -686,6 +692,36 @@ class ParDo(PTransformWithSideInputs):
       raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
     return _MultiParDo(self, tags, main_tag)
 
+  def _pardo_fn_data(self):
+    si_tags_and_types = []
+    windowing = None
+    return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
+
+  def to_runner_api_parameter(self, context):
+    return (
+        urns.PARDO_TRANSFORM,
+        beam_runner_api_pb2.ParDoPayload(
+            do_fn=beam_runner_api_pb2.SdkFunctionSpec(
+                spec=beam_runner_api_pb2.FunctionSpec(
+                    urn=urns.PICKLED_DO_FN_INFO,
+                    parameter=proto_utils.pack_Any(
+                        wrappers_pb2.BytesValue(
+                            value=pickler.dumps(
+                                self._pardo_fn_data())))))))
+
+  @PTransform.register_urn(
+      urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
+  def from_runner_api_parameter(pardo_payload, context):
+    assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
+    fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
+        proto_utils.unpack_Any(
+            pardo_payload.do_fn.spec.parameter, wrappers_pb2.BytesValue).value)
+    if si_tags_and_types:
+      raise NotImplementedError('deferred side inputs')
+    elif windowing:
+      raise NotImplementedError('explicit windowing')
+    return ParDo(fn, *args, **kwargs)
+
 
 class _MultiParDo(PTransform):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 7110802..dcdf0f3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -32,10 +32,13 @@ FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
 SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
 SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
 
+PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1"
+PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1"
 PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
 PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
+PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
 COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
 COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"