You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/06/04 22:37:25 UTC

[beam] branch master updated: [BEAM-7388] Reify PTransform for Python SDK

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ede2b3  [BEAM-7388] Reify PTransform for Python SDK
     new 9fdf6f7  Merge pull request #8717 from ttanay/reify-python
2ede2b3 is described below

commit 2ede2b3d54360b0f6dfbd5ab96ea9cd1a7779872
Author: ttanay <tt...@gmail.com>
AuthorDate: Tue May 28 23:25:19 2019 +0530

    [BEAM-7388] Reify PTransform for Python SDK
---
 sdks/python/apache_beam/transforms/util.py      | 64 +++++++++++++++++++++++
 sdks/python/apache_beam/transforms/util_test.py | 68 +++++++++++++++++++++++++
 2 files changed, 132 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 3876773..289d8bf 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -59,6 +59,7 @@ __all__ = [
     'Distinct',
     'Keys',
     'KvSwap',
+    'Reify',
     'RemoveDuplicates',
     'Reshuffle',
     'ToString',
@@ -724,3 +725,66 @@ class ToString(object):
           Map(lambda x: self.delimiter.join(str(_x) for _x in x)))
                        .with_input_types(input_type)
                        .with_output_types(output_type)))
+
+
+class Reify(object):
+  """PTransforms for converting between explicit and implicit form of various
+  Beam values."""
+
+  @typehints.with_input_types(T)
+  @typehints.with_output_types(T)
+  class Timestamp(PTransform):
+    """PTransform to wrap a value in a TimestampedValue with it's
+    associated timestamp."""
+
+    @staticmethod
+    def add_timestamp_info(element, timestamp=DoFn.TimestampParam):
+      yield TimestampedValue(element, timestamp)
+
+    def expand(self, pcoll):
+      return pcoll | ParDo(self.add_timestamp_info)
+
+  @typehints.with_input_types(T)
+  @typehints.with_output_types(T)
+  class Window(PTransform):
+    """PTransform to convert an element in a PCollection into a tuple of
+    (element, timestamp, window), wrapped in a TimestampedValue with it's
+    associated timestamp."""
+
+    @staticmethod
+    def add_window_info(element, timestamp=DoFn.TimestampParam,
+                        window=DoFn.WindowParam):
+      yield TimestampedValue((element, timestamp, window), timestamp)
+
+    def expand(self, pcoll):
+      return pcoll | ParDo(self.add_window_info)
+
+  @typehints.with_input_types(typehints.KV[K, V])
+  @typehints.with_output_types(typehints.KV[K, V])
+  class TimestampInValue(PTransform):
+    """PTransform to wrap the Value in a KV pair in a TimestampedValue with
+    the element's associated timestamp."""
+
+    @staticmethod
+    def add_timestamp_info(element, timestamp=DoFn.TimestampParam):
+      key, value = element
+      yield (key, TimestampedValue(value, timestamp))
+
+    def expand(self, pcoll):
+      return pcoll | ParDo(self.add_timestamp_info)
+
+  @typehints.with_input_types(typehints.KV[K, V])
+  @typehints.with_output_types(typehints.KV[K, V])
+  class WindowInValue(PTransform):
+    """PTransform to convert the Value in a KV pair into a tuple of
+    (value, timestamp, window), with the whole element being wrapped inside a
+    TimestampedValue."""
+
+    @staticmethod
+    def add_window_info(element, timestamp=DoFn.TimestampParam,
+                        window=DoFn.WindowParam):
+      key, value = element
+      yield TimestampedValue((key, (value, timestamp, window)), timestamp)
+
+    def expand(self, pcoll):
+      return pcoll | ParDo(self.add_window_info)
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 419cf7d..3a18487 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -465,6 +465,74 @@ class ToStringTest(unittest.TestCase):
       assert_that(result, equal_to(["one\t1", "two\t2"]))
 
 
+class ReifyTest(unittest.TestCase):
+
+  def test_timestamp(self):
+    l = [TimestampedValue('a', 100),
+         TimestampedValue('b', 200),
+         TimestampedValue('c', 300)]
+    expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
+                TestWindowedValue('b', 200, [GlobalWindow()]),
+                TestWindowedValue('c', 300, [GlobalWindow()])]
+    with TestPipeline() as p:
+      # Map(lambda x: x) PTransform is added after Create here, because when
+      # a PCollection of TimestampedValues is created with Create PTransform,
+      # the timestamps are not assigned to it. Adding a Map forces the
+      # PCollection to go through a DoFn so that the PCollection consists of
+      # the elements with timestamps assigned to them instead of a PCollection
+      # of TimestampedValue(element, timestamp).
+      pc = p | beam.Create(l) | beam.Map(lambda x: x)
+      reified_pc = pc | util.Reify.Timestamp()
+      assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+  def test_window(self):
+    l = [GlobalWindows.windowed_value('a', 100),
+         GlobalWindows.windowed_value('b', 200),
+         GlobalWindows.windowed_value('c', 300)]
+    expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
+                                  [GlobalWindow()]),
+                TestWindowedValue(('b', 200, GlobalWindow()), 200,
+                                  [GlobalWindow()]),
+                TestWindowedValue(('c', 300, GlobalWindow()), 300,
+                                  [GlobalWindow()])]
+    with TestPipeline() as p:
+      pc = p | beam.Create(l)
+      reified_pc = pc | util.Reify.Window()
+      assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+  def test_timestamp_in_value(self):
+    l = [TimestampedValue(('a', 1), 100),
+         TimestampedValue(('b', 2), 200),
+         TimestampedValue(('c', 3), 300)]
+    expected = [TestWindowedValue(('a', TimestampedValue(1, 100)), 100,
+                                  [GlobalWindow()]),
+                TestWindowedValue(('b', TimestampedValue(2, 200)), 200,
+                                  [GlobalWindow()]),
+                TestWindowedValue(('c', TimestampedValue(3, 300)), 300,
+                                  [GlobalWindow()])]
+    with TestPipeline() as p:
+      pc = p | beam.Create(l) | beam.Map(lambda x: x)
+      reified_pc = pc | util.Reify.TimestampInValue()
+      assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+  def test_window_in_value(self):
+    l = [GlobalWindows.windowed_value(('a', 1), 100),
+         GlobalWindows.windowed_value(('b', 2), 200),
+         GlobalWindows.windowed_value(('c', 3), 300)]
+    expected = [TestWindowedValue(('a', (1, 100, GlobalWindow())), 100,
+                                  [GlobalWindow()]),
+                TestWindowedValue(('b', (2, 200, GlobalWindow())), 200,
+                                  [GlobalWindow()]),
+                TestWindowedValue(('c', (3, 300, GlobalWindow())), 300,
+                                  [GlobalWindow()])]
+    with TestPipeline() as p:
+      # Map(lambda x: x) hack is used for the same reason here.
+      # Also, this makes the typehint on Reify.WindowInValue work.
+      pc = p | beam.Create(l) | beam.Map(lambda x: x)
+      reified_pc = pc | util.Reify.WindowInValue()
+      assert_that(reified_pc, equal_to(expected), reify_windows=True)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()