You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/23 19:39:13 UTC
[beam] branch master updated: [BEAM-7023] WithKeys PTransform for
Python SDK
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8d68bac [BEAM-7023] WithKeys PTransform for Python SDK
new f0ed4a9 Merge pull request #8663 from ttanay/beam-7023
8d68bac is described below
commit 8d68bac1e8d04028ead7eb72de046b1ae90a6507
Author: ttanay <tt...@gmail.com>
AuthorDate: Thu May 23 18:41:10 2019 +0530
[BEAM-7023] WithKeys PTransform for Python SDK
---
sdks/python/apache_beam/transforms/util.py | 13 +++++++++++++
sdks/python/apache_beam/transforms/util_test.py | 18 ++++++++++++++++++
2 files changed, 31 insertions(+)
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 64dce9d..43d3cf9 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -62,6 +62,7 @@ __all__ = [
'RemoveDuplicates',
'Reshuffle',
'Values',
+ 'WithKeys'
]
K = typehints.TypeVariable('K')
@@ -653,3 +654,15 @@ class Reshuffle(PTransform):
@PTransform.register_urn(common_urns.composites.RESHUFFLE.urn, None)
def from_runner_api_parameter(unused_parameter, unused_context):
return Reshuffle()
+
+
+@ptransform_fn
+def WithKeys(pcoll, k):
+ """PTransform that takes a PCollection, and either a constant key or a
+ callable, and returns a PCollection of (K, V), where each of the values in
+ the input PCollection has been paired with either the constant key or a key
+ computed from the value.
+ """
+ if callable(k):
+ return pcoll | Map(lambda v: (k(v), v))
+ return pcoll | Map(lambda v: (k, v))
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 31a44840..68a3c03 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -414,6 +414,24 @@ class ReshuffleTest(unittest.TestCase):
pipeline.run()
+class WithKeysTest(unittest.TestCase):
+
+ def setUp(self):
+ self.l = [1, 2, 3]
+
+ def test_constant_k(self):
+ with TestPipeline() as p:
+ pc = p | beam.Create(self.l)
+ with_keys = pc | util.WithKeys('k')
+ assert_that(with_keys, equal_to([('k', 1), ('k', 2), ('k', 3)], ))
+
+ def test_callable_k(self):
+ with TestPipeline() as p:
+ pc = p | beam.Create(self.l)
+ with_keys = pc | util.WithKeys(lambda x: x*x)
+ assert_that(with_keys, equal_to([(1, 1), (4, 2), (9, 3)]))
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()