You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/06/21 23:23:48 UTC
[beam] branch master updated: Add Python snippet for FlatMap
transform
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 19b636d Add Python snippet for FlatMap transform
new 31342b0 Merge pull request #8901 from davidcavazos/element-wise-flat-map
19b636d is described below
commit 19b636d1dddc3866272b90c2e375ef3120d17843
Author: David Cavazos <dc...@google.com>
AuthorDate: Mon Jun 10 16:15:05 2019 -0700
Add Python snippet for FlatMap transform
---
.../snippets/transforms/element_wise/flat_map.py | 221 +++++++++++++++++++++
.../transforms/element_wise/flat_map_test.py | 87 ++++++++
2 files changed, 308 insertions(+)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map.py
new file mode 100644
index 0000000..c227a71
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map.py
@@ -0,0 +1,221 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def flat_map_simple(test=None):
+ # [START flat_map_simple]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ '🍓Strawberry 🥕Carrot 🍆Eggplant',
+ '🍅Tomato 🥔Potato',
+ ])
+ | 'Split words' >> beam.FlatMap(str.split)
+ | beam.Map(print)
+ )
+ # [END flat_map_simple]
+ if test:
+ test(plants)
+
+
+def flat_map_function(test=None):
+ # [START flat_map_function]
+ import apache_beam as beam
+
+ def split_words(text):
+ return text.split(',')
+
+ with beam.Pipeline() as pipeline:
+ plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ '🍓Strawberry,🥕Carrot,🍆Eggplant',
+ '🍅Tomato,🥔Potato',
+ ])
+ | 'Split words' >> beam.FlatMap(split_words)
+ | beam.Map(print)
+ )
+ # [END flat_map_function]
+ if test:
+ test(plants)
+
+
+def flat_map_lambda(test=None):
+ # [START flat_map_lambda]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
+ ['🍅Tomato', '🥔Potato'],
+ ])
+ | 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
+ | beam.Map(print)
+ )
+ # [END flat_map_lambda]
+ if test:
+ test(plants)
+
+
+def flat_map_generator(test=None):
+ # [START flat_map_generator]
+ import apache_beam as beam
+
+ def generate_elements(elements):
+ for element in elements:
+ yield element
+
+ with beam.Pipeline() as pipeline:
+ plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
+ ['🍅Tomato', '🥔Potato'],
+ ])
+ | 'Flatten lists' >> beam.FlatMap(generate_elements)
+ | beam.Map(print)
+ )
+ # [END flat_map_generator]
+ if test:
+ test(plants)
+
+
+def flat_map_multiple_arguments(test=None):
+ # [START flat_map_multiple_arguments]
+ import apache_beam as beam
+
+ def split_words(text, delimiter=None):
+ return text.split(delimiter)
+
+ with beam.Pipeline() as pipeline:
+ plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ '🍓Strawberry,🥕Carrot,🍆Eggplant',
+ '🍅Tomato,🥔Potato',
+ ])
+ | 'Split words' >> beam.FlatMap(split_words, delimiter=',')
+ | beam.Map(print)
+ )
+ # [END flat_map_multiple_arguments]
+ if test:
+ test(plants)
+
+
+def flat_map_side_inputs_singleton(test=None):
+ # [START flat_map_side_inputs_singleton]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ delimiter = pipeline | 'Create delimiter' >> beam.Create([','])
+
+ plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ '🍓Strawberry,🥕Carrot,🍆Eggplant',
+ '🍅Tomato,🥔Potato',
+ ])
+ | 'Split words' >> beam.FlatMap(
+ lambda text, delimiter: text.split(delimiter),
+ delimiter=beam.pvalue.AsSingleton(delimiter),
+ )
+ | beam.Map(print)
+ )
+ # [END flat_map_side_inputs_singleton]
+ if test:
+ test(plants)
+
+
+def flat_map_side_inputs_iter(test=None):
+ # [START flat_map_side_inputs_iter]
+ import apache_beam as beam
+
+ def normalize_and_validate_durations(plant, valid_durations):
+ plant['duration'] = plant['duration'].lower()
+ if plant['duration'] in valid_durations:
+ yield plant
+
+ with beam.Pipeline() as pipeline:
+ valid_durations = pipeline | 'Valid durations' >> beam.Create([
+ 'annual',
+ 'biennial',
+ 'perennial',
+ ])
+
+ valid_plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'Perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'BIENNIAL'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'unknown'},
+ ])
+ | 'Normalize and validate durations' >> beam.FlatMap(
+ normalize_and_validate_durations,
+ valid_durations=beam.pvalue.AsIter(valid_durations),
+ )
+ | beam.Map(print)
+ )
+ # [END flat_map_side_inputs_iter]
+ if test:
+ test(valid_plants)
+
+
+def flat_map_side_inputs_dict(test=None):
+ # [START flat_map_side_inputs_dict]
+ import apache_beam as beam
+
+ def replace_duration_if_valid(plant, durations):
+ if plant['duration'] in durations:
+ plant['duration'] = durations[plant['duration']]
+ yield plant
+
+ with beam.Pipeline() as pipeline:
+ durations = pipeline | 'Durations dict' >> beam.Create([
+ (0, 'annual'),
+ (1, 'biennial'),
+ (2, 'perennial'),
+ ])
+
+ valid_plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 2},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 1},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 2},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 0},
+ {'icon': '🥔', 'name': 'Potato', 'duration': -1},
+ ])
+ | 'Replace duration if valid' >> beam.FlatMap(
+ replace_duration_if_valid,
+ durations=beam.pvalue.AsDict(durations),
+ )
+ | beam.Map(print)
+ )
+ # [END flat_map_side_inputs_dict]
+ if test:
+ test(valid_plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map_test.py
new file mode 100644
index 0000000..43b3b49
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/flat_map_test.py
@@ -0,0 +1,87 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.transforms.element_wise.flat_map import *
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.element_wise.flat_map.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class FlatMapTest(unittest.TestCase):
+ def __init__(self, methodName):
+ super(FlatMapTest, self).__init__(methodName)
+ # [START plants]
+ plants = [
+ '🍓Strawberry',
+ '🥕Carrot',
+ '🍆Eggplant',
+ '🍅Tomato',
+ '🥔Potato',
+ ]
+ # [END plants]
+ self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
+
+ # [START valid_plants]
+ valid_plants = [
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ ]
+ # [END valid_plants]
+ self.valid_plants_test = lambda actual: \
+ assert_that(actual, equal_to(valid_plants))
+
+ def test_flat_map_simple(self):
+ flat_map_simple(self.plants_test)
+
+ def test_flat_map_function(self):
+ flat_map_function(self.plants_test)
+
+ def test_flat_map_lambda(self):
+ flat_map_lambda(self.plants_test)
+
+ def test_flat_map_generator(self):
+ flat_map_generator(self.plants_test)
+
+ def test_flat_map_multiple_arguments(self):
+ flat_map_multiple_arguments(self.plants_test)
+
+ def test_flat_map_side_inputs_singleton(self):
+ flat_map_side_inputs_singleton(self.plants_test)
+
+ def test_flat_map_side_inputs_iter(self):
+ flat_map_side_inputs_iter(self.valid_plants_test)
+
+ def test_flat_map_side_inputs_dict(self):
+ flat_map_side_inputs_dict(self.valid_plants_test)
+
+
+if __name__ == '__main__':
+ unittest.main()