You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:24 UTC

[5/5] beam git commit: Streaming fixes.

Streaming fixes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c0c3374
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c0c3374
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c0c3374

Branch: refs/heads/master
Commit: 3c0c337401ca164093c06df5b9021106ba7e5eb9
Parents: 7d64b76
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:01:49 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py | 27 ++++++++++++++++++--
 sdks/python/apache_beam/transforms/core.py      | 13 ++++++++++
 sdks/python/apache_beam/utils/urns.py           |  1 +
 3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 1a94b3d..7a88d0e 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,6 +26,8 @@ from __future__ import absolute_import
 import collections
 import logging
 
+from google.protobuf import wrappers_pb2
+
 import apache_beam as beam
 from apache_beam import typehints
 from apache_beam.metrics.execution import MetricsEnvironment
@@ -35,6 +37,7 @@ from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.core import _GroupAlsoByWindow
 from apache_beam.transforms.core import _GroupByKeyOnly
 from apache_beam.options.pipeline_options import DirectOptions
@@ -54,14 +57,34 @@ V = typehints.TypeVariable('V')
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
 class _StreamingGroupByKeyOnly(_GroupByKeyOnly):
   """Streaming GroupByKeyOnly placeholder for overriding in DirectRunner."""
-  pass
+  urn = "direct_runner:streaming_gbko:v0.1"
+
+  # These are needed due to apply overloads.
+  def to_runner_api_parameter(self, unused_context):
+    return _StreamingGroupByKeyOnly.urn, None
+
+  @PTransform.register_urn(urn, None)
+  def from_runner_api_parameter(unused_payload, unused_context):
+    return _StreamingGroupByKeyOnly()
 
 
 @typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
 class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
   """Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner."""
-  pass
+  urn = "direct_runner:streaming_gabw:v0.1"
+
+  # These are needed due to apply overloads.
+  def to_runner_api_parameter(self, context):
+    return (
+        _StreamingGroupAlsoByWindow.urn,
+        wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id(
+            self.windowing)))
+
+  @PTransform.register_urn(urn, wrappers_pb2.BytesValue)
+  def from_runner_api_parameter(payload, context):
+    return _StreamingGroupAlsoByWindow(
+        context.windowing_strategies.get_by_id(payload.value))
 
 
 class DirectRunner(PipelineRunner):

http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9168a89..671fea4 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -698,6 +698,7 @@ class ParDo(PTransformWithSideInputs):
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
   def to_runner_api_parameter(self, context):
+    assert self.__class__ is ParDo
     return (
         urns.PARDO_TRANSFORM,
         beam_runner_api_pb2.ParDoPayload(
@@ -1210,6 +1211,18 @@ class _GroupAlsoByWindow(ParDo):
     self._check_pcollection(pcoll)
     return pvalue.PCollection(pcoll.pipeline)
 
+  def to_runner_api_parameter(self, context):
+    return (
+        urns.GROUP_ALSO_BY_WINDOW_TRANSFORM,
+        wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id(
+            self.windowing)))
+
+  @PTransform.register_urn(
+      urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, wrappers_pb2.BytesValue)
+  def from_runner_api_parameter(payload, context):
+    return _GroupAlsoByWindow(
+        context.windowing_strategies.get_by_id(payload.value))
+
 
 class _GroupAlsoByWindowDoFn(DoFn):
   # TODO(robertwb): Support combiner lifting.

http://git-wip-us.apache.org/repos/asf/beam/blob/3c0c3374/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index dcdf0f3..e7ef80b 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -39,6 +39,7 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
+GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
 COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
 COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"