You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/06/20 23:02:27 UTC
[beam] branch master updated: Add Python snippet for Filter
transform
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1a39ecf Add Python snippet for Filter transform
new 4c1f28a Merge pull request #8900 from davidcavazos/element-wise-filter
1a39ecf is described below
commit 1a39ecf1c10aede8f0dba005f4184b85b1556ae8
Author: David Cavazos <dc...@google.com>
AuthorDate: Mon Jun 10 16:13:19 2019 -0700
Add Python snippet for Filter transform
---
.../snippets/transforms/element_wise/filter.py | 182 +++++++++++++++++++++
.../transforms/element_wise/filter_test.py | 80 +++++++++
2 files changed, 262 insertions(+)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
new file mode 100644
index 0000000..ded8af2
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
@@ -0,0 +1,182 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def filter_function(test=None):
+ # [START filter_function]
+ import apache_beam as beam
+
+ def is_perennial(plant):
+ return plant['duration'] == 'perennial'
+
+ with beam.Pipeline() as pipeline:
+ perennials = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Filter perennials' >> beam.Filter(is_perennial)
+ | beam.Map(print)
+ )
+ # [END filter_function]
+ if test:
+ test(perennials)
+
+
+def filter_lambda(test=None):
+ # [START filter_lambda]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ perennials = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Filter perennials' >> beam.Filter(
+ lambda plant: plant['duration'] == 'perennial')
+ | beam.Map(print)
+ )
+ # [END filter_lambda]
+ if test:
+ test(perennials)
+
+
+def filter_multiple_arguments(test=None):
+ # [START filter_multiple_arguments]
+ import apache_beam as beam
+
+ def has_duration(plant, duration):
+ return plant['duration'] == duration
+
+ with beam.Pipeline() as pipeline:
+ perennials = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
+ | beam.Map(print)
+ )
+ # [END filter_multiple_arguments]
+ if test:
+ test(perennials)
+
+
+def filter_side_inputs_singleton(test=None):
+ # [START filter_side_inputs_singleton]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])
+
+ perennials = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Filter perennials' >> beam.Filter(
+ lambda plant, duration: plant['duration'] >= duration,
+ duration=beam.pvalue.AsSingleton(perennial),
+ )
+ | beam.Map(print)
+ )
+ # [END filter_side_inputs_singleton]
+ if test:
+ test(perennials)
+
+
+def filter_side_inputs_iter(test=None):
+ # [START filter_side_inputs_iter]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ valid_durations = pipeline | 'Valid durations' >> beam.Create([
+ 'annual',
+ 'biennial',
+ 'perennial',
+ ])
+
+ valid_plants = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'PERENNIAL'},
+ ])
+ | 'Filter valid plants' >> beam.Filter(
+ lambda plant, valid_durations: plant['duration'] in valid_durations,
+ valid_durations=beam.pvalue.AsIter(valid_durations),
+ )
+ | beam.Map(print)
+ )
+ # [END filter_side_inputs_iter]
+ if test:
+ test(valid_plants)
+
+
+def filter_side_inputs_dict(test=None):
+ # [START filter_side_inputs_dict]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ keep_duration = pipeline | 'Duration filters' >> beam.Create([
+ ('annual', False),
+ ('biennial', False),
+ ('perennial', True),
+ ])
+
+ perennials = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Filter plants by duration' >> beam.Filter(
+ lambda plant, keep_duration: keep_duration[plant['duration']],
+ keep_duration=beam.pvalue.AsDict(keep_duration),
+ )
+ | beam.Map(print)
+ )
+ # [END filter_side_inputs_dict]
+ if test:
+ test(perennials)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
new file mode 100644
index 0000000..02da146
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
@@ -0,0 +1,80 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.transforms.element_wise.filter import *
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.element_wise.filter.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class FilterTest(unittest.TestCase):
+ def __init__(self, methodName):
+ super(FilterTest, self).__init__(methodName)
+ # [START perennials]
+ perennials = [
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ]
+ # [END perennials]
+ self.perennials_test = lambda actual: \
+ assert_that(actual, equal_to(perennials))
+
+ # [START valid_plants]
+ valid_plants = [
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ ]
+ # [END valid_plants]
+ self.valid_plants_test = lambda actual: \
+ assert_that(actual, equal_to(valid_plants))
+
+ def test_filter_function(self):
+ filter_function(self.perennials_test)
+
+ def test_filter_lambda(self):
+ filter_lambda(self.perennials_test)
+
+ def test_filter_multiple_arguments(self):
+ filter_multiple_arguments(self.perennials_test)
+
+ def test_filter_side_inputs_singleton(self):
+ filter_side_inputs_singleton(self.perennials_test)
+
+ def test_filter_side_inputs_iter(self):
+ filter_side_inputs_iter(self.valid_plants_test)
+
+ def test_filter_side_inputs_dict(self):
+ filter_side_inputs_dict(self.perennials_test)
+
+
+if __name__ == '__main__':
+ unittest.main()