You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/06/20 23:02:27 UTC

[beam] branch master updated: Add Python snippet for Filter transform

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a39ecf  Add Python snippet for Filter transform
     new 4c1f28a  Merge pull request #8900 from davidcavazos/element-wise-filter
1a39ecf is described below

commit 1a39ecf1c10aede8f0dba005f4184b85b1556ae8
Author: David Cavazos <dc...@google.com>
AuthorDate: Mon Jun 10 16:13:19 2019 -0700

    Add Python snippet for Filter transform
---
 .../snippets/transforms/element_wise/filter.py     | 182 +++++++++++++++++++++
 .../transforms/element_wise/filter_test.py         |  80 +++++++++
 2 files changed, 262 insertions(+)

diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
new file mode 100644
index 0000000..ded8af2
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
@@ -0,0 +1,182 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def filter_function(test=None):
+  # [START filter_function]
+  import apache_beam as beam
+
+  def is_perennial(plant):
+    return plant['duration'] == 'perennial'
+
+  with beam.Pipeline() as pipeline:
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(is_perennial)
+        | beam.Map(print)
+    )
+    # [END filter_function]
+    if test:
+      test(perennials)
+
+
+def filter_lambda(test=None):
+  # [START filter_lambda]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(
+            lambda plant: plant['duration'] == 'perennial')
+        | beam.Map(print)
+    )
+    # [END filter_lambda]
+    if test:
+      test(perennials)
+
+
+def filter_multiple_arguments(test=None):
+  # [START filter_multiple_arguments]
+  import apache_beam as beam
+
+  def has_duration(plant, duration):
+    return plant['duration'] == duration
+
+  with beam.Pipeline() as pipeline:
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
+        | beam.Map(print)
+    )
+    # [END filter_multiple_arguments]
+    if test:
+      test(perennials)
+
+
+def filter_side_inputs_singleton(test=None):
+  # [START filter_side_inputs_singleton]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])
+
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(
+            lambda plant, duration: plant['duration'] >= duration,
+            duration=beam.pvalue.AsSingleton(perennial),
+        )
+        | beam.Map(print)
+    )
+    # [END filter_side_inputs_singleton]
+    if test:
+      test(perennials)
+
+
+def filter_side_inputs_iter(test=None):
+  # [START filter_side_inputs_iter]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    valid_durations = pipeline | 'Valid durations' >> beam.Create([
+        'annual',
+        'biennial',
+        'perennial',
+    ])
+
+    valid_plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'PERENNIAL'},
+        ])
+        | 'Filter valid plants' >> beam.Filter(
+            lambda plant, valid_durations: plant['duration'] in valid_durations,
+            valid_durations=beam.pvalue.AsIter(valid_durations),
+        )
+        | beam.Map(print)
+    )
+    # [END filter_side_inputs_iter]
+    if test:
+      test(valid_plants)
+
+
+def filter_side_inputs_dict(test=None):
+  # [START filter_side_inputs_dict]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    keep_duration = pipeline | 'Duration filters' >> beam.Create([
+        ('annual', False),
+        ('biennial', False),
+        ('perennial', True),
+    ])
+
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter plants by duration' >> beam.Filter(
+            lambda plant, keep_duration: keep_duration[plant['duration']],
+            keep_duration=beam.pvalue.AsDict(keep_duration),
+        )
+        | beam.Map(print)
+    )
+    # [END filter_side_inputs_dict]
+    if test:
+      test(perennials)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
new file mode 100644
index 0000000..02da146
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
@@ -0,0 +1,80 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.transforms.element_wise.filter import *
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.element_wise.filter.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class FilterTest(unittest.TestCase):
+  def __init__(self, methodName):
+    super(FilterTest, self).__init__(methodName)
+    # [START perennials]
+    perennials = [
+        {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+        {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+        {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+    ]
+    # [END perennials]
+    self.perennials_test = lambda actual: \
+        assert_that(actual, equal_to(perennials))
+
+    # [START valid_plants]
+    valid_plants = [
+        {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+        {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+        {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+        {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+    ]
+    # [END valid_plants]
+    self.valid_plants_test = lambda actual: \
+        assert_that(actual, equal_to(valid_plants))
+
+  def test_filter_function(self):
+    filter_function(self.perennials_test)
+
+  def test_filter_lambda(self):
+    filter_lambda(self.perennials_test)
+
+  def test_filter_multiple_arguments(self):
+    filter_multiple_arguments(self.perennials_test)
+
+  def test_filter_side_inputs_singleton(self):
+    filter_side_inputs_singleton(self.perennials_test)
+
+  def test_filter_side_inputs_iter(self):
+    filter_side_inputs_iter(self.valid_plants_test)
+
+  def test_filter_side_inputs_dict(self):
+    filter_side_inputs_dict(self.perennials_test)
+
+
+if __name__ == '__main__':
+  unittest.main()