You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/06/25 01:47:54 UTC

[beam] branch master updated: Add Python snippet for ParDo transform

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 08f600b  Add Python snippet for ParDo transform
     new 754a2ec  Merge pull request #8903 from davidcavazos/element-wise-pardo
08f600b is described below

commit 08f600bc34e8e31cebc25cf1e752447ed736312e
Author: David Cavazos <dc...@google.com>
AuthorDate: Mon Jun 10 16:23:45 2019 -0700

    Add Python snippet for ParDo transform
---
 .../snippets/transforms/element_wise/pardo.py      | 83 ++++++++++++++++++++++
 .../snippets/transforms/element_wise/pardo_test.py | 77 ++++++++++++++++++++
 2 files changed, 160 insertions(+)

diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
new file mode 100644
index 0000000..d8ce5bf
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
@@ -0,0 +1,83 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def pardo_dofn(test=None):
+  # [START pardo_dofn]
+  import apache_beam as beam
+
+  class SplitWords(beam.DoFn):
+    def __init__(self, delimiter=','):
+      self.delimiter = delimiter
+
+    def process(self, text):
+      return text.split(self.delimiter)
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '🍓Strawberry,🥕Carrot,🍆Eggplant',
+            '🍅Tomato,🥔Potato',
+        ])
+        | 'Split words' >> beam.ParDo(SplitWords(','))
+        | beam.Map(print)
+    )
+    # [END pardo_dofn]
+    if test:
+      test(plants)
+
+
+def pardo_dofn_params(test=None):
+  # [START pardo_dofn_params]
+  import apache_beam as beam
+
+  # pylint: disable=line-too-long
+  class AnalyzeElement(beam.DoFn):
+    def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
+      yield '\n'.join([
+          '# timestamp',
+          'type(timestamp) -> ' + repr(type(timestamp)),
+          'timestamp.micros -> ' + repr(timestamp.micros),
+          'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
+          'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
+          '',
+          '# window',
+          'type(window) -> ' + repr(type(window)),
+          'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
+          'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
+          'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
+      ])
+  # pylint: enable=line-too-long
+
+  with beam.Pipeline() as pipeline:
+    dofn_params = (
+        pipeline
+        | 'Create a single test element' >> beam.Create([':)'])
+        | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
+            lambda elem: beam.window.TimestampedValue(elem, 1584675660))
+        | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
+        | 'Analyze element' >> beam.ParDo(AnalyzeElement())
+        | beam.Map(print)
+    )
+    # [END pardo_dofn_params]
+    if test:
+      test(dofn_params)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
new file mode 100644
index 0000000..9cb617e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
@@ -0,0 +1,77 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.transforms.element_wise.pardo import *
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.element_wise.pardo.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class ParDoTest(unittest.TestCase):
+  def __init__(self, methodName):
+    super(ParDoTest, self).__init__(methodName)
+    # [START plants]
+    plants = [
+        '🍓Strawberry',
+        '🥕Carrot',
+        '🍆Eggplant',
+        '🍅Tomato',
+        '🥔Potato',
+    ]
+    # [END plants]
+    self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
+
+    # pylint: disable=line-too-long
+    # [START dofn_params]
+    dofn_params = '''\
+# timestamp
+type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
+timestamp.micros -> 1584675660000000
+timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
+timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)
+
+# window
+type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
+window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
+window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
+window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)'''
+    # [END dofn_params]
+    # pylint: enable=line-too-long
+    self.dofn_params_test = lambda actual: \
+        assert_that(actual, equal_to([dofn_params]))
+
+  def test_pardo_dofn(self):
+    pardo_dofn(self.plants_test)
+
+  def test_pardo_dofn_params(self):
+    pardo_dofn_params(self.dofn_params_test)
+
+
+if __name__ == '__main__':
+  unittest.main()