You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:24 UTC
[5/5] beam git commit: Streaming fixes.
Streaming fixes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c0c3374
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c0c3374
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c0c3374
Branch: refs/heads/master
Commit: 3c0c337401ca164093c06df5b9021106ba7e5eb9
Parents: 7d64b76
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:01:49 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 27 ++++++++++++++++++--
sdks/python/apache_beam/transforms/core.py | 13 ++++++++++
sdks/python/apache_beam/utils/urns.py | 1 +
3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 1a94b3d..7a88d0e 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,6 +26,8 @@ from __future__ import absolute_import
import collections
import logging
+from google.protobuf import wrappers_pb2
+
import apache_beam as beam
from apache_beam import typehints
from apache_beam.metrics.execution import MetricsEnvironment
@@ -35,6 +37,7 @@ from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.core import _GroupAlsoByWindow
from apache_beam.transforms.core import _GroupByKeyOnly
from apache_beam.options.pipeline_options import DirectOptions
@@ -54,14 +57,34 @@ V = typehints.TypeVariable('V')
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
class _StreamingGroupByKeyOnly(_GroupByKeyOnly):
"""Streaming GroupByKeyOnly placeholder for overriding in DirectRunner."""
- pass
+ urn = "direct_runner:streaming_gbko:v0.1"
+
+ # These are needed due to apply overloads.
+ def to_runner_api_parameter(self, unused_context):
+ return _StreamingGroupByKeyOnly.urn, None
+
+ @PTransform.register_urn(urn, None)
+ def from_runner_api_parameter(unused_payload, unused_context):
+ return _StreamingGroupByKeyOnly()
@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
"""Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner."""
- pass
+ urn = "direct_runner:streaming_gabw:v0.1"
+
+ # These are needed due to apply overloads.
+ def to_runner_api_parameter(self, context):
+ return (
+ _StreamingGroupAlsoByWindow.urn,
+ wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id(
+ self.windowing)))
+
+ @PTransform.register_urn(urn, wrappers_pb2.BytesValue)
+ def from_runner_api_parameter(payload, context):
+ return _StreamingGroupAlsoByWindow(
+ context.windowing_strategies.get_by_id(payload.value))
class DirectRunner(PipelineRunner):
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9168a89..671fea4 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -698,6 +698,7 @@ class ParDo(PTransformWithSideInputs):
return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
def to_runner_api_parameter(self, context):
+ assert self.__class__ is ParDo
return (
urns.PARDO_TRANSFORM,
beam_runner_api_pb2.ParDoPayload(
@@ -1210,6 +1211,18 @@ class _GroupAlsoByWindow(ParDo):
self._check_pcollection(pcoll)
return pvalue.PCollection(pcoll.pipeline)
+ def to_runner_api_parameter(self, context):
+ return (
+ urns.GROUP_ALSO_BY_WINDOW_TRANSFORM,
+ wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id(
+ self.windowing)))
+
+ @PTransform.register_urn(
+ urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, wrappers_pb2.BytesValue)
+ def from_runner_api_parameter(payload, context):
+ return _GroupAlsoByWindow(
+ context.windowing_strategies.get_by_id(payload.value))
+
class _GroupAlsoByWindowDoFn(DoFn):
# TODO(robertwb): Support combiner lifting.
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index dcdf0f3..e7ef80b 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -39,6 +39,7 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
+GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"