You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/06/21 23:23:48 UTC

[beam] branch master updated: Add Python snippet for FlatMap transform

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 19b636d  Add Python snippet for FlatMap transform
     new 31342b0  Merge pull request #8901 from davidcavazos/element-wise-flat-map
19b636d is described below

commit 19b636d1dddc3866272b90c2e375ef3120d17843
Author: David Cavazos <dc...@google.com>
AuthorDate: Mon Jun 10 16:15:05 2019 -0700

    Add Python snippet for FlatMap transform
---
 .../snippets/transforms/element_wise/flat_map.py   | 221 +++++++++++++++++++++
 .../transforms/element_wise/flat_map_test.py       |  87 ++++++++
 2 files changed, 308 insertions(+)

diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map.py
new file mode 100644
index 0000000..c227a71
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map.py
@@ -0,0 +1,221 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def flat_map_simple(test=None):
+  # [START flat_map_simple]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '🍓Strawberry 🥕Carrot 🍆Eggplant',
+            '🍅Tomato 🥔Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(str.split)
+        | beam.Map(print)
+    )
+    # [END flat_map_simple]
+    if test:
+      test(plants)
+
+
+def flat_map_function(test=None):
+  # [START flat_map_function]
+  import apache_beam as beam
+
+  def split_words(text):
+    return text.split(',')
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '🍓Strawberry,🥕Carrot,🍆Eggplant',
+            '🍅Tomato,🥔Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(split_words)
+        | beam.Map(print)
+    )
+    # [END flat_map_function]
+    if test:
+      test(plants)
+
+
+def flat_map_lambda(test=None):
+  # [START flat_map_lambda]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
+            ['🍅Tomato', '🥔Potato'],
+        ])
+        | 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
+        | beam.Map(print)
+    )
+    # [END flat_map_lambda]
+    if test:
+      test(plants)
+
+
+def flat_map_generator(test=None):
+  # [START flat_map_generator]
+  import apache_beam as beam
+
+  def generate_elements(elements):
+    for element in elements:
+      yield element
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
+            ['🍅Tomato', '🥔Potato'],
+        ])
+        | 'Flatten lists' >> beam.FlatMap(generate_elements)
+        | beam.Map(print)
+    )
+    # [END flat_map_generator]
+    if test:
+      test(plants)
+
+
+def flat_map_multiple_arguments(test=None):
+  # [START flat_map_multiple_arguments]
+  import apache_beam as beam
+
+  def split_words(text, delimiter=None):
+    return text.split(delimiter)
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '🍓Strawberry,🥕Carrot,🍆Eggplant',
+            '🍅Tomato,🥔Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(split_words, delimiter=',')
+        | beam.Map(print)
+    )
+    # [END flat_map_multiple_arguments]
+    if test:
+      test(plants)
+
+
+def flat_map_side_inputs_singleton(test=None):
+  # [START flat_map_side_inputs_singleton]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    delimiter = pipeline | 'Create delimiter' >> beam.Create([','])
+
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '🍓Strawberry,🥕Carrot,🍆Eggplant',
+            '🍅Tomato,🥔Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(
+            lambda text, delimiter: text.split(delimiter),
+            delimiter=beam.pvalue.AsSingleton(delimiter),
+        )
+        | beam.Map(print)
+    )
+    # [END flat_map_side_inputs_singleton]
+    if test:
+      test(plants)
+
+
+def flat_map_side_inputs_iter(test=None):
+  # [START flat_map_side_inputs_iter]
+  import apache_beam as beam
+
+  def normalize_and_validate_durations(plant, valid_durations):
+    plant['duration'] = plant['duration'].lower()
+    if plant['duration'] in valid_durations:
+      yield plant
+
+  with beam.Pipeline() as pipeline:
+    valid_durations = pipeline | 'Valid durations' >> beam.Create([
+        'annual',
+        'biennial',
+        'perennial',
+    ])
+
+    valid_plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'Perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'BIENNIAL'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'unknown'},
+        ])
+        | 'Normalize and validate durations' >> beam.FlatMap(
+            normalize_and_validate_durations,
+            valid_durations=beam.pvalue.AsIter(valid_durations),
+        )
+        | beam.Map(print)
+    )
+    # [END flat_map_side_inputs_iter]
+    if test:
+      test(valid_plants)
+
+
+def flat_map_side_inputs_dict(test=None):
+  # [START flat_map_side_inputs_dict]
+  import apache_beam as beam
+
+  def replace_duration_if_valid(plant, durations):
+    if plant['duration'] in durations:
+      plant['duration'] = durations[plant['duration']]
+      yield plant
+
+  with beam.Pipeline() as pipeline:
+    durations = pipeline | 'Durations dict' >> beam.Create([
+        (0, 'annual'),
+        (1, 'biennial'),
+        (2, 'perennial'),
+    ])
+
+    valid_plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 2},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 1},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 2},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 0},
+            {'icon': '🥔', 'name': 'Potato', 'duration': -1},
+        ])
+        | 'Replace duration if valid' >> beam.FlatMap(
+            replace_duration_if_valid,
+            durations=beam.pvalue.AsDict(durations),
+        )
+        | beam.Map(print)
+    )
+    # [END flat_map_side_inputs_dict]
+    if test:
+      test(valid_plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map_test.py
new file mode 100644
index 0000000..43b3b49
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map_test.py
@@ -0,0 +1,87 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.transforms.element_wise.flat_map import *
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.element_wise.flat_map.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class FlatMapTest(unittest.TestCase):
+  def __init__(self, methodName):
+    super(FlatMapTest, self).__init__(methodName)
+    # [START plants]
+    plants = [
+        '🍓Strawberry',
+        '🥕Carrot',
+        '🍆Eggplant',
+        '🍅Tomato',
+        '🥔Potato',
+    ]
+    # [END plants]
+    self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
+
+    # [START valid_plants]
+    valid_plants = [
+        {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+        {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+        {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+        {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+    ]
+    # [END valid_plants]
+    self.valid_plants_test = lambda actual: \
+        assert_that(actual, equal_to(valid_plants))
+
+  def test_flat_map_simple(self):
+    flat_map_simple(self.plants_test)
+
+  def test_flat_map_function(self):
+    flat_map_function(self.plants_test)
+
+  def test_flat_map_lambda(self):
+    flat_map_lambda(self.plants_test)
+
+  def test_flat_map_generator(self):
+    flat_map_generator(self.plants_test)
+
+  def test_flat_map_multiple_arguments(self):
+    flat_map_multiple_arguments(self.plants_test)
+
+  def test_flat_map_side_inputs_singleton(self):
+    flat_map_side_inputs_singleton(self.plants_test)
+
+  def test_flat_map_side_inputs_iter(self):
+    flat_map_side_inputs_iter(self.valid_plants_test)
+
+  def test_flat_map_side_inputs_dict(self):
+    flat_map_side_inputs_dict(self.valid_plants_test)
+
+
+if __name__ == '__main__':
+  unittest.main()