You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/06/04 22:37:25 UTC
[beam] branch master updated: [BEAM-7388] Reify PTransform for
Python SDK
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2ede2b3 [BEAM-7388] Reify PTransform for Python SDK
new 9fdf6f7 Merge pull request #8717 from ttanay/reify-python
2ede2b3 is described below
commit 2ede2b3d54360b0f6dfbd5ab96ea9cd1a7779872
Author: ttanay <tt...@gmail.com>
AuthorDate: Tue May 28 23:25:19 2019 +0530
[BEAM-7388] Reify PTransform for Python SDK
---
sdks/python/apache_beam/transforms/util.py | 64 +++++++++++++++++++++++
sdks/python/apache_beam/transforms/util_test.py | 68 +++++++++++++++++++++++++
2 files changed, 132 insertions(+)
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 3876773..289d8bf 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -59,6 +59,7 @@ __all__ = [
'Distinct',
'Keys',
'KvSwap',
+ 'Reify',
'RemoveDuplicates',
'Reshuffle',
'ToString',
@@ -724,3 +725,66 @@ class ToString(object):
Map(lambda x: self.delimiter.join(str(_x) for _x in x)))
.with_input_types(input_type)
.with_output_types(output_type)))
+
+
+class Reify(object):
+ """PTransforms for converting between explicit and implicit form of various
+ Beam values."""
+
+ @typehints.with_input_types(T)
+ @typehints.with_output_types(T)
+ class Timestamp(PTransform):
+ """PTransform to wrap a value in a TimestampedValue with it's
+ associated timestamp."""
+
+ @staticmethod
+ def add_timestamp_info(element, timestamp=DoFn.TimestampParam):
+ yield TimestampedValue(element, timestamp)
+
+ def expand(self, pcoll):
+ return pcoll | ParDo(self.add_timestamp_info)
+
+ @typehints.with_input_types(T)
+ @typehints.with_output_types(T)
+ class Window(PTransform):
+ """PTransform to convert an element in a PCollection into a tuple of
+ (element, timestamp, window), wrapped in a TimestampedValue with it's
+ associated timestamp."""
+
+ @staticmethod
+ def add_window_info(element, timestamp=DoFn.TimestampParam,
+ window=DoFn.WindowParam):
+ yield TimestampedValue((element, timestamp, window), timestamp)
+
+ def expand(self, pcoll):
+ return pcoll | ParDo(self.add_window_info)
+
+ @typehints.with_input_types(typehints.KV[K, V])
+ @typehints.with_output_types(typehints.KV[K, V])
+ class TimestampInValue(PTransform):
+ """PTransform to wrap the Value in a KV pair in a TimestampedValue with
+ the element's associated timestamp."""
+
+ @staticmethod
+ def add_timestamp_info(element, timestamp=DoFn.TimestampParam):
+ key, value = element
+ yield (key, TimestampedValue(value, timestamp))
+
+ def expand(self, pcoll):
+ return pcoll | ParDo(self.add_timestamp_info)
+
+ @typehints.with_input_types(typehints.KV[K, V])
+ @typehints.with_output_types(typehints.KV[K, V])
+ class WindowInValue(PTransform):
+ """PTransform to convert the Value in a KV pair into a tuple of
+ (value, timestamp, window), with the whole element being wrapped inside a
+ TimestampedValue."""
+
+ @staticmethod
+ def add_window_info(element, timestamp=DoFn.TimestampParam,
+ window=DoFn.WindowParam):
+ key, value = element
+ yield TimestampedValue((key, (value, timestamp, window)), timestamp)
+
+ def expand(self, pcoll):
+ return pcoll | ParDo(self.add_window_info)
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 419cf7d..3a18487 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -465,6 +465,74 @@ class ToStringTest(unittest.TestCase):
assert_that(result, equal_to(["one\t1", "two\t2"]))
+class ReifyTest(unittest.TestCase):
+
+ def test_timestamp(self):
+ l = [TimestampedValue('a', 100),
+ TimestampedValue('b', 200),
+ TimestampedValue('c', 300)]
+ expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
+ TestWindowedValue('b', 200, [GlobalWindow()]),
+ TestWindowedValue('c', 300, [GlobalWindow()])]
+ with TestPipeline() as p:
+ # Map(lambda x: x) PTransform is added after Create here, because when
+ # a PCollection of TimestampedValues is created with Create PTransform,
+ # the timestamps are not assigned to it. Adding a Map forces the
+ # PCollection to go through a DoFn so that the PCollection consists of
+ # the elements with timestamps assigned to them instead of a PCollection
+ # of TimestampedValue(element, timestamp).
+ pc = p | beam.Create(l) | beam.Map(lambda x: x)
+ reified_pc = pc | util.Reify.Timestamp()
+ assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+ def test_window(self):
+ l = [GlobalWindows.windowed_value('a', 100),
+ GlobalWindows.windowed_value('b', 200),
+ GlobalWindows.windowed_value('c', 300)]
+ expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
+ [GlobalWindow()]),
+ TestWindowedValue(('b', 200, GlobalWindow()), 200,
+ [GlobalWindow()]),
+ TestWindowedValue(('c', 300, GlobalWindow()), 300,
+ [GlobalWindow()])]
+ with TestPipeline() as p:
+ pc = p | beam.Create(l)
+ reified_pc = pc | util.Reify.Window()
+ assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+ def test_timestamp_in_value(self):
+ l = [TimestampedValue(('a', 1), 100),
+ TimestampedValue(('b', 2), 200),
+ TimestampedValue(('c', 3), 300)]
+ expected = [TestWindowedValue(('a', TimestampedValue(1, 100)), 100,
+ [GlobalWindow()]),
+ TestWindowedValue(('b', TimestampedValue(2, 200)), 200,
+ [GlobalWindow()]),
+ TestWindowedValue(('c', TimestampedValue(3, 300)), 300,
+ [GlobalWindow()])]
+ with TestPipeline() as p:
+ pc = p | beam.Create(l) | beam.Map(lambda x: x)
+ reified_pc = pc | util.Reify.TimestampInValue()
+ assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+ def test_window_in_value(self):
+ l = [GlobalWindows.windowed_value(('a', 1), 100),
+ GlobalWindows.windowed_value(('b', 2), 200),
+ GlobalWindows.windowed_value(('c', 3), 300)]
+ expected = [TestWindowedValue(('a', (1, 100, GlobalWindow())), 100,
+ [GlobalWindow()]),
+ TestWindowedValue(('b', (2, 200, GlobalWindow())), 200,
+ [GlobalWindow()]),
+ TestWindowedValue(('c', (3, 300, GlobalWindow())), 300,
+ [GlobalWindow()])]
+ with TestPipeline() as p:
+ # Map(lambda x: x) hack is used for the same reason here.
+ # Also, this makes the typehint on Reify.WindowInValue work.
+ pc = p | beam.Create(l) | beam.Map(lambda x: x)
+ reified_pc = pc | util.Reify.WindowInValue()
+ assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()