You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/06/25 01:47:54 UTC
[beam] branch master updated: Add Python snippet for ParDo transform
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 08f600b Add Python snippet for ParDo transform
new 754a2ec Merge pull request #8903 from davidcavazos/element-wise-pardo
08f600b is described below
commit 08f600bc34e8e31cebc25cf1e752447ed736312e
Author: David Cavazos <dc...@google.com>
AuthorDate: Mon Jun 10 16:23:45 2019 -0700
Add Python snippet for ParDo transform
---
.../snippets/transforms/element_wise/pardo.py | 83 ++++++++++++++++++++++
.../snippets/transforms/element_wise/pardo_test.py | 77 ++++++++++++++++++++
2 files changed, 160 insertions(+)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
new file mode 100644
index 0000000..d8ce5bf
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
@@ -0,0 +1,83 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def pardo_dofn(test=None):
+ # [START pardo_dofn]
+ import apache_beam as beam
+
+ class SplitWords(beam.DoFn):
+ def __init__(self, delimiter=','):
+ self.delimiter = delimiter
+
+ def process(self, text):
+ return text.split(self.delimiter)
+
+ with beam.Pipeline() as pipeline:
+ plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ '🍓Strawberry,🥕Carrot,🍆Eggplant',
+ '🍅Tomato,🥔Potato',
+ ])
+ | 'Split words' >> beam.ParDo(SplitWords(','))
+ | beam.Map(print)
+ )
+ # [END pardo_dofn]
+ if test:
+ test(plants)
+
+
+def pardo_dofn_params(test=None):
+ # [START pardo_dofn_params]
+ import apache_beam as beam
+
+ # pylint: disable=line-too-long
+ class AnalyzeElement(beam.DoFn):
+ def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
+ yield '\n'.join([
+ '# timestamp',
+ 'type(timestamp) -> ' + repr(type(timestamp)),
+ 'timestamp.micros -> ' + repr(timestamp.micros),
+ 'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
+ 'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
+ '',
+ '# window',
+ 'type(window) -> ' + repr(type(window)),
+ 'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
+ 'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
+ 'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
+ ])
+ # pylint: enable=line-too-long
+
+ with beam.Pipeline() as pipeline:
+ dofn_params = (
+ pipeline
+ | 'Create a single test element' >> beam.Create([':)'])
+ | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
+ lambda elem: beam.window.TimestampedValue(elem, 1584675660))
+ | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
+ | 'Analyze element' >> beam.ParDo(AnalyzeElement())
+ | beam.Map(print)
+ )
+ # [END pardo_dofn_params]
+ if test:
+ test(dofn_params)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
new file mode 100644
index 0000000..9cb617e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
@@ -0,0 +1,77 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.transforms.element_wise.pardo import *
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.element_wise.pardo.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class ParDoTest(unittest.TestCase):
+ def __init__(self, methodName):
+ super(ParDoTest, self).__init__(methodName)
+ # [START plants]
+ plants = [
+ '🍓Strawberry',
+ '🥕Carrot',
+ '🍆Eggplant',
+ '🍅Tomato',
+ '🥔Potato',
+ ]
+ # [END plants]
+ self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
+
+ # pylint: disable=line-too-long
+ # [START dofn_params]
+ dofn_params = '''\
+# timestamp
+type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
+timestamp.micros -> 1584675660000000
+timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
+timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)
+
+# window
+type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
+window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
+window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
+window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)'''
+ # [END dofn_params]
+ # pylint: enable=line-too-long
+ self.dofn_params_test = lambda actual: \
+ assert_that(actual, equal_to([dofn_params]))
+
+ def test_pardo_dofn(self):
+ pardo_dofn(self.plants_test)
+
+ def test_pardo_dofn_params(self):
+ pardo_dofn_params(self.dofn_params_test)
+
+
+if __name__ == '__main__':
+ unittest.main()