You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:21 UTC
[2/5] beam git commit: Translate GroupByKey[Only] through the Runner
API.
Translate GroupByKey[Only] through the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b179eca9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b179eca9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b179eca9
Branch: refs/heads/master
Commit: b179eca90f2af262aed637a2a0c099680a0822d2
Parents: 3c0c337
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:21:20 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/core.py | 14 ++++++++++++++
sdks/python/apache_beam/utils/urns.py | 2 ++
2 files changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b179eca9/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 671fea4..cff6dbe 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1184,6 +1184,13 @@ class GroupByKey(PTransform):
| 'GroupByKey' >> _GroupByKeyOnly()
| 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing))
+ def to_runner_api_parameter(self, unused_context):
+ return urns.GROUP_BY_KEY_TRANSFORM, None
+
+ @PTransform.register_urn(urns.GROUP_BY_KEY_TRANSFORM, None)
+ def from_runner_api_parameter(unused_payload, unused_context):
+ return GroupByKey()
+
@typehints.with_input_types(typehints.KV[K, V])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
@@ -1197,6 +1204,13 @@ class _GroupByKeyOnly(PTransform):
self._check_pcollection(pcoll)
return pvalue.PCollection(pcoll.pipeline)
+ def to_runner_api_parameter(self, unused_context):
+ return urns.GROUP_BY_KEY_ONLY_TRANSFORM, None
+
+ @PTransform.register_urn(urns.GROUP_BY_KEY_ONLY_TRANSFORM, None)
+ def from_runner_api_parameter(unused_payload, unused_context):
+ return _GroupByKeyOnly()
+
@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
http://git-wip-us.apache.org/repos/asf/beam/blob/b179eca9/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index e7ef80b..0013cb3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -39,6 +39,8 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
+GROUP_BY_KEY_TRANSFORM = "beam:ptransform:group_by_key:v0.1"
+GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1"
GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"