You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:22 UTC
[3/5] beam git commit: Translate ParDo operations through the Runner
API.
Translate ParDo operations through the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d64b769
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d64b769
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d64b769
Branch: refs/heads/master
Commit: 7d64b769a7099e767a81c693294b5a66791a441b
Parents: 53c2c8f
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 16:30:56 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 11 +++++++
sdks/python/apache_beam/transforms/core.py | 38 ++++++++++++++++++++++++-
sdks/python/apache_beam/utils/urns.py | 3 ++
3 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index fe36d85..8553f7c 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -66,6 +66,7 @@ from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
from apache_beam.utils.annotations import deprecated
+from apache_beam.utils import urns
__all__ = ['Pipeline']
@@ -474,6 +475,12 @@ class Pipeline(object):
return str, ('Pickled pipeline stub.',)
def _verify_runner_api_compatible(self):
+ if self._options.view_as(TypeOptions).runtime_type_check:
+ # This option is incompatible with the runner API as it requires
+ # the runner to inspect non-serialized hints on the transform
+ # itself.
+ return False
+
class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment
ok = True # Really a nonlocal.
@@ -745,6 +752,10 @@ class AppliedPTransform(object):
result.outputs = {
None if tag == 'None' else tag: context.pcollections.get_by_id(id)
for tag, id in proto.outputs.items()}
+ # This annotation is expected by some runners.
+ if proto.spec.urn == urns.PARDO_TRANSFORM:
+ result.transform.output_tags = set(proto.outputs.keys()).difference(
+ {'None'})
if not result.parts:
for tag, pc in result.outputs.items():
if pc not in result.inputs:
http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 25fe39f..9168a89 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -23,10 +23,13 @@ import copy
import inspect
import types
+from google.protobuf import wrappers_pb2
+
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam import coders
from apache_beam.coders import typecoders
+from apache_beam.internal import pickler
from apache_beam.internal import util
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import ptransform
@@ -50,6 +53,7 @@ from apache_beam.typehints.decorators import TypeCheckError
from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.trivial_inference import element_type
from apache_beam.typehints.typehints import is_consistent_with
+from apache_beam.utils import proto_utils
from apache_beam.utils import urns
from apache_beam.options.pipeline_options import TypeOptions
@@ -136,7 +140,7 @@ class DoFnProcessContext(DoFnContext):
self.windows = windowed_value.windows
-class DoFn(WithTypeHints, HasDisplayData):
+class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""A function object used by a transform with custom processing.
The ParDo transform is such a transform. The ParDo.apply
@@ -236,6 +240,8 @@ class DoFn(WithTypeHints, HasDisplayData):
return False # Method is a classmethod
return True
+ urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_DO_FN)
+
def _fn_takes_side_inputs(fn):
try:
@@ -686,6 +692,36 @@ class ParDo(PTransformWithSideInputs):
raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
return _MultiParDo(self, tags, main_tag)
+ def _pardo_fn_data(self):
+ si_tags_and_types = []
+ windowing = None
+ return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
+
+ def to_runner_api_parameter(self, context):
+ return (
+ urns.PARDO_TRANSFORM,
+ beam_runner_api_pb2.ParDoPayload(
+ do_fn=beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=urns.PICKLED_DO_FN_INFO,
+ parameter=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(
+ value=pickler.dumps(
+ self._pardo_fn_data())))))))
+
+ @PTransform.register_urn(
+ urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
+ def from_runner_api_parameter(pardo_payload, context):
+ assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
+ fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
+ proto_utils.unpack_Any(
+ pardo_payload.do_fn.spec.parameter, wrappers_pb2.BytesValue).value)
+ if si_tags_and_types:
+ raise NotImplementedError('deferred side inputs')
+ elif windowing:
+ raise NotImplementedError('explicit windowing')
+ return ParDo(fn, *args, **kwargs)
+
class _MultiParDo(PTransform):
http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 7110802..dcdf0f3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -32,10 +32,13 @@ FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
+PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1"
+PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1"
PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
+PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"