You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:20 UTC
[1/5] beam git commit: Closes #3650
Repository: beam
Updated Branches:
refs/heads/master 53c2c8fde -> ba5e31466
Closes #3650
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ba5e3146
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ba5e3146
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ba5e3146
Branch: refs/heads/master
Commit: ba5e314664563f23ac8c5125b89a12ddbbd4e433
Parents: 53c2c8f 2ee7422
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Aug 1 17:32:59 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 14 ++++-
.../apache_beam/runners/direct/direct_runner.py | 27 +++++++-
.../apache_beam/runners/pipeline_context.py | 8 +--
sdks/python/apache_beam/transforms/core.py | 65 +++++++++++++++++++-
sdks/python/apache_beam/utils/urns.py | 6 ++
5 files changed, 112 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[5/5] beam git commit: Streaming fixes.
Posted by ro...@apache.org.
Streaming fixes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c0c3374
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c0c3374
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c0c3374
Branch: refs/heads/master
Commit: 3c0c337401ca164093c06df5b9021106ba7e5eb9
Parents: 7d64b76
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:01:49 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 27 ++++++++++++++++++--
sdks/python/apache_beam/transforms/core.py | 13 ++++++++++
sdks/python/apache_beam/utils/urns.py | 1 +
3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 1a94b3d..7a88d0e 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,6 +26,8 @@ from __future__ import absolute_import
import collections
import logging
+from google.protobuf import wrappers_pb2
+
import apache_beam as beam
from apache_beam import typehints
from apache_beam.metrics.execution import MetricsEnvironment
@@ -35,6 +37,7 @@ from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.core import _GroupAlsoByWindow
from apache_beam.transforms.core import _GroupByKeyOnly
from apache_beam.options.pipeline_options import DirectOptions
@@ -54,14 +57,34 @@ V = typehints.TypeVariable('V')
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
class _StreamingGroupByKeyOnly(_GroupByKeyOnly):
"""Streaming GroupByKeyOnly placeholder for overriding in DirectRunner."""
- pass
+ urn = "direct_runner:streaming_gbko:v0.1"
+
+ # These are needed due to apply overloads.
+ def to_runner_api_parameter(self, unused_context):
+ return _StreamingGroupByKeyOnly.urn, None
+
+ @PTransform.register_urn(urn, None)
+ def from_runner_api_parameter(unused_payload, unused_context):
+ return _StreamingGroupByKeyOnly()
@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
"""Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner."""
- pass
+ urn = "direct_runner:streaming_gabw:v0.1"
+
+ # These are needed due to apply overloads.
+ def to_runner_api_parameter(self, context):
+ return (
+ _StreamingGroupAlsoByWindow.urn,
+ wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id(
+ self.windowing)))
+
+ @PTransform.register_urn(urn, wrappers_pb2.BytesValue)
+ def from_runner_api_parameter(payload, context):
+ return _StreamingGroupAlsoByWindow(
+ context.windowing_strategies.get_by_id(payload.value))
class DirectRunner(PipelineRunner):
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9168a89..671fea4 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -698,6 +698,7 @@ class ParDo(PTransformWithSideInputs):
return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
def to_runner_api_parameter(self, context):
+ assert self.__class__ is ParDo
return (
urns.PARDO_TRANSFORM,
beam_runner_api_pb2.ParDoPayload(
@@ -1210,6 +1211,18 @@ class _GroupAlsoByWindow(ParDo):
self._check_pcollection(pcoll)
return pvalue.PCollection(pcoll.pipeline)
+ def to_runner_api_parameter(self, context):
+ return (
+ urns.GROUP_ALSO_BY_WINDOW_TRANSFORM,
+ wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id(
+ self.windowing)))
+
+ @PTransform.register_urn(
+ urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, wrappers_pb2.BytesValue)
+ def from_runner_api_parameter(payload, context):
+ return _GroupAlsoByWindow(
+ context.windowing_strategies.get_by_id(payload.value))
+
class _GroupAlsoByWindowDoFn(DoFn):
# TODO(robertwb): Support combiner lifting.
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index dcdf0f3..e7ef80b 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -39,6 +39,7 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
+GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
[3/5] beam git commit: Translate ParDo operations through the Runner
API.
Posted by ro...@apache.org.
Translate ParDo operations through the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d64b769
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d64b769
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d64b769
Branch: refs/heads/master
Commit: 7d64b769a7099e767a81c693294b5a66791a441b
Parents: 53c2c8f
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 16:30:56 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 11 +++++++
sdks/python/apache_beam/transforms/core.py | 38 ++++++++++++++++++++++++-
sdks/python/apache_beam/utils/urns.py | 3 ++
3 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index fe36d85..8553f7c 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -66,6 +66,7 @@ from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
from apache_beam.utils.annotations import deprecated
+from apache_beam.utils import urns
__all__ = ['Pipeline']
@@ -474,6 +475,12 @@ class Pipeline(object):
return str, ('Pickled pipeline stub.',)
def _verify_runner_api_compatible(self):
+ if self._options.view_as(TypeOptions).runtime_type_check:
+ # This option is incompatible with the runner API as it requires
+ # the runner to inspect non-serialized hints on the transform
+ # itself.
+ return False
+
class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment
ok = True # Really a nonlocal.
@@ -745,6 +752,10 @@ class AppliedPTransform(object):
result.outputs = {
None if tag == 'None' else tag: context.pcollections.get_by_id(id)
for tag, id in proto.outputs.items()}
+ # This annotation is expected by some runners.
+ if proto.spec.urn == urns.PARDO_TRANSFORM:
+ result.transform.output_tags = set(proto.outputs.keys()).difference(
+ {'None'})
if not result.parts:
for tag, pc in result.outputs.items():
if pc not in result.inputs:
http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 25fe39f..9168a89 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -23,10 +23,13 @@ import copy
import inspect
import types
+from google.protobuf import wrappers_pb2
+
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam import coders
from apache_beam.coders import typecoders
+from apache_beam.internal import pickler
from apache_beam.internal import util
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import ptransform
@@ -50,6 +53,7 @@ from apache_beam.typehints.decorators import TypeCheckError
from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.trivial_inference import element_type
from apache_beam.typehints.typehints import is_consistent_with
+from apache_beam.utils import proto_utils
from apache_beam.utils import urns
from apache_beam.options.pipeline_options import TypeOptions
@@ -136,7 +140,7 @@ class DoFnProcessContext(DoFnContext):
self.windows = windowed_value.windows
-class DoFn(WithTypeHints, HasDisplayData):
+class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""A function object used by a transform with custom processing.
The ParDo transform is such a transform. The ParDo.apply
@@ -236,6 +240,8 @@ class DoFn(WithTypeHints, HasDisplayData):
return False # Method is a classmethod
return True
+ urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_DO_FN)
+
def _fn_takes_side_inputs(fn):
try:
@@ -686,6 +692,36 @@ class ParDo(PTransformWithSideInputs):
raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
return _MultiParDo(self, tags, main_tag)
+ def _pardo_fn_data(self):
+ si_tags_and_types = []
+ windowing = None
+ return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
+
+ def to_runner_api_parameter(self, context):
+ return (
+ urns.PARDO_TRANSFORM,
+ beam_runner_api_pb2.ParDoPayload(
+ do_fn=beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=urns.PICKLED_DO_FN_INFO,
+ parameter=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(
+ value=pickler.dumps(
+ self._pardo_fn_data())))))))
+
+ @PTransform.register_urn(
+ urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
+ def from_runner_api_parameter(pardo_payload, context):
+ assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
+ fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
+ proto_utils.unpack_Any(
+ pardo_payload.do_fn.spec.parameter, wrappers_pb2.BytesValue).value)
+ if si_tags_and_types:
+ raise NotImplementedError('deferred side inputs')
+ elif windowing:
+ raise NotImplementedError('explicit windowing')
+ return ParDo(fn, *args, **kwargs)
+
class _MultiParDo(PTransform):
http://git-wip-us.apache.org/repos/asf/beam/blob/7d64b769/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 7110802..dcdf0f3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -32,10 +32,13 @@ FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
+PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1"
+PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1"
PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
+PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
[2/5] beam git commit: Translate GroupByKey[Only] through the Runner
API.
Posted by ro...@apache.org.
Translate GroupByKey[Only] through the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b179eca9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b179eca9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b179eca9
Branch: refs/heads/master
Commit: b179eca90f2af262aed637a2a0c099680a0822d2
Parents: 3c0c337
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:21:20 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/core.py | 14 ++++++++++++++
sdks/python/apache_beam/utils/urns.py | 2 ++
2 files changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b179eca9/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 671fea4..cff6dbe 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1184,6 +1184,13 @@ class GroupByKey(PTransform):
| 'GroupByKey' >> _GroupByKeyOnly()
| 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing))
+ def to_runner_api_parameter(self, unused_context):
+ return urns.GROUP_BY_KEY_TRANSFORM, None
+
+ @PTransform.register_urn(urns.GROUP_BY_KEY_TRANSFORM, None)
+ def from_runner_api_parameter(unused_payload, unused_context):
+ return GroupByKey()
+
@typehints.with_input_types(typehints.KV[K, V])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
@@ -1197,6 +1204,13 @@ class _GroupByKeyOnly(PTransform):
self._check_pcollection(pcoll)
return pvalue.PCollection(pcoll.pipeline)
+ def to_runner_api_parameter(self, unused_context):
+ return urns.GROUP_BY_KEY_ONLY_TRANSFORM, None
+
+ @PTransform.register_urn(urns.GROUP_BY_KEY_ONLY_TRANSFORM, None)
+ def from_runner_api_parameter(unused_payload, unused_context):
+ return _GroupByKeyOnly()
+
@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
http://git-wip-us.apache.org/repos/asf/beam/blob/b179eca9/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index e7ef80b..0013cb3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -39,6 +39,8 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
+GROUP_BY_KEY_TRANSFORM = "beam:ptransform:group_by_key:v0.1"
+GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1"
GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
[4/5] beam git commit: More informative references in the proto
representation.
Posted by ro...@apache.org.
More informative references in the proto representation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ee7422f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ee7422f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ee7422f
Branch: refs/heads/master
Commit: 2ee7422f1dbd00e3c40ac95e9eaddc745d46fe65
Parents: b179eca
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:21:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 3 ++-
sdks/python/apache_beam/runners/pipeline_context.py | 8 ++++----
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2ee7422f/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8553f7c..e7c2322 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -730,7 +730,8 @@ class AppliedPTransform(object):
return beam_runner_api_pb2.PTransform(
unique_name=self.full_label,
spec=transform_to_runner_api(self.transform, context),
- subtransforms=[context.transforms.get_id(part) for part in self.parts],
+ subtransforms=[context.transforms.get_id(part, label=part.full_label)
+ for part in self.parts],
# TODO(BEAM-115): Side inputs.
inputs={tag: context.pcollections.get_id(pc)
for tag, pc in self.named_inputs().items()},
http://git-wip-us.apache.org/repos/asf/beam/blob/2ee7422f/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index a40069b..f4de42a 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -43,18 +43,18 @@ class _PipelineContextMap(object):
self._id_to_proto = proto_map if proto_map else {}
self._counter = 0
- def _unique_ref(self, obj=None):
+ def _unique_ref(self, obj=None, label=None):
self._counter += 1
return "ref_%s_%s_%s" % (
- self._obj_type.__name__, type(obj).__name__, self._counter)
+ self._obj_type.__name__, label or type(obj).__name__, self._counter)
def populate_map(self, proto_map):
for id, proto in self._id_to_proto.items():
proto_map[id].CopyFrom(proto)
- def get_id(self, obj):
+ def get_id(self, obj, label=None):
if obj not in self._obj_to_id:
- id = self._unique_ref(obj)
+ id = self._unique_ref(obj, label)
self._id_to_obj[id] = obj
self._obj_to_id[obj] = id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)