You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/23 19:39:13 UTC

[beam] branch master updated: [BEAM-7023] WithKeys PTransform for Python SDK

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d68bac  [BEAM-7023] WithKeys PTransform for Python SDK
     new f0ed4a9  Merge pull request #8663 from ttanay/beam-7023
8d68bac is described below

commit 8d68bac1e8d04028ead7eb72de046b1ae90a6507
Author: ttanay <tt...@gmail.com>
AuthorDate: Thu May 23 18:41:10 2019 +0530

    [BEAM-7023] WithKeys PTransform for Python SDK
---
 sdks/python/apache_beam/transforms/util.py      | 13 +++++++++++++
 sdks/python/apache_beam/transforms/util_test.py | 18 ++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 64dce9d..43d3cf9 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -62,6 +62,7 @@ __all__ = [
     'RemoveDuplicates',
     'Reshuffle',
     'Values',
+    'WithKeys'
     ]
 
 K = typehints.TypeVariable('K')
@@ -653,3 +654,15 @@ class Reshuffle(PTransform):
   @PTransform.register_urn(common_urns.composites.RESHUFFLE.urn, None)
   def from_runner_api_parameter(unused_parameter, unused_context):
     return Reshuffle()
+
+
+@ptransform_fn
+def WithKeys(pcoll, k):
+  """PTransform that takes a PCollection, and either a constant key or a
+  callable, and returns a PCollection of (K, V), where each of the values in
+  the input PCollection has been paired with either the constant key or a key
+  computed from the value.
+  """
+  if callable(k):
+    return pcoll | Map(lambda v: (k(v), v))
+  return pcoll | Map(lambda v: (k, v))
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 31a44840..68a3c03 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -414,6 +414,24 @@ class ReshuffleTest(unittest.TestCase):
     pipeline.run()
 
 
+class WithKeysTest(unittest.TestCase):
+
+  def setUp(self):
+    self.l = [1, 2, 3]
+
+  def test_constant_k(self):
+    with TestPipeline() as p:
+      pc = p | beam.Create(self.l)
+      with_keys = pc | util.WithKeys('k')
+    assert_that(with_keys, equal_to([('k', 1), ('k', 2), ('k', 3)], ))
+
+  def test_callable_k(self):
+    with TestPipeline() as p:
+      pc = p | beam.Create(self.l)
+      with_keys = pc | util.WithKeys(lambda x: x*x)
+    assert_that(with_keys, equal_to([(1, 1), (4, 2), (9, 3)]))
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()