You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:21 UTC

[2/5] beam git commit: Translate GroupByKey[Only] through the Runner API.

Translate GroupByKey[Only] through the Runner API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b179eca9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b179eca9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b179eca9

Branch: refs/heads/master
Commit: b179eca90f2af262aed637a2a0c099680a0822d2
Parents: 3c0c337
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:21:20 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py | 14 ++++++++++++++
 sdks/python/apache_beam/utils/urns.py      |  2 ++
 2 files changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b179eca9/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 671fea4..cff6dbe 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1184,6 +1184,13 @@ class GroupByKey(PTransform):
               | 'GroupByKey' >> _GroupByKeyOnly()
               | 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing))
 
+  def to_runner_api_parameter(self, unused_context):
+    return urns.GROUP_BY_KEY_TRANSFORM, None
+
+  @PTransform.register_urn(urns.GROUP_BY_KEY_TRANSFORM, None)
+  def from_runner_api_parameter(unused_payload, unused_context):
+    return GroupByKey()
+
 
 @typehints.with_input_types(typehints.KV[K, V])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
@@ -1197,6 +1204,13 @@ class _GroupByKeyOnly(PTransform):
     self._check_pcollection(pcoll)
     return pvalue.PCollection(pcoll.pipeline)
 
+  def to_runner_api_parameter(self, unused_context):
+    return urns.GROUP_BY_KEY_ONLY_TRANSFORM, None
+
+  @PTransform.register_urn(urns.GROUP_BY_KEY_ONLY_TRANSFORM, None)
+  def from_runner_api_parameter(unused_payload, unused_context):
+    return _GroupByKeyOnly()
+
 
 @typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])

http://git-wip-us.apache.org/repos/asf/beam/blob/b179eca9/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index e7ef80b..0013cb3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -39,6 +39,8 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
+GROUP_BY_KEY_TRANSFORM = "beam:ptransform:group_by_key:v0.1"
+GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1"
 GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
 COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
 COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"