You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/01/13 18:08:11 UTC

[beam] branch master updated: [BEAM-7390] Add code snippet for Min

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aaac9c  [BEAM-7390] Add code snippet for Min
     new 02efc22  Merge pull request #10177 from davidcavazos/min-code
4aaac9c is described below

commit 4aaac9ccd8be9c5bd2abf9e334589f6029505753
Author: David Cavazos <dc...@google.com>
AuthorDate: Tue Nov 19 13:09:35 2019 -0800

    [BEAM-7390] Add code snippet for Min
---
 .../snippets/transforms/aggregation/min.py         | 60 ++++++++++++++++++++++
 .../snippets/transforms/aggregation/min_test.py    | 60 ++++++++++++++++++++++
 2 files changed, 120 insertions(+)

diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min.py
new file mode 100644
index 0000000..d004cf2
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min.py
@@ -0,0 +1,60 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def min_globally(test=None):
+  # [START min_globally]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    min_element = (
+        pipeline
+        | 'Create numbers' >> beam.Create([3, 4, 1, 2])
+        | 'Get min value' >> beam.CombineGlobally(
+            lambda elements: min(elements or [-1]))
+        | beam.Map(print)
+    )
+    # [END min_globally]
+    if test:
+      test(min_element)
+
+
+def min_per_key(test=None):
+  # [START min_per_key]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    elements_with_min_value_per_key = (
+        pipeline
+        | 'Create produce' >> beam.Create([
+            ('🥕', 3),
+            ('🥕', 2),
+            ('🍆', 1),
+            ('🍅', 4),
+            ('🍅', 5),
+            ('🍅', 3),
+        ])
+        | 'Get min value per key' >> beam.CombinePerKey(min)
+        | beam.Map(print)
+    )
+    # [END min_per_key]
+    if test:
+      test(elements_with_min_value_per_key)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min_test.py
new file mode 100644
index 0000000..321fd12
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min_test.py
@@ -0,0 +1,60 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.util import assert_matches_stdout
+from apache_beam.testing.test_pipeline import TestPipeline
+
+from . import min as beam_min
+
+
+def check_min_element(actual):
+  expected = '''[START min_element]
+1
+[END min_element]'''.splitlines()[1:-1]
+  assert_matches_stdout(actual, expected)
+
+
+def check_elements_with_min_value_per_key(actual):
+  expected = '''[START elements_with_min_value_per_key]
+('🥕', 2)
+('🍆', 1)
+('🍅', 3)
+[END elements_with_min_value_per_key]'''.splitlines()[1:-1]
+  assert_matches_stdout(actual, expected)
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+@mock.patch(
+    'apache_beam.examples.snippets.transforms.aggregation.min.print', str)
+class MinTest(unittest.TestCase):
+  def test_min_globally(self):
+    beam_min.min_globally(check_min_element)
+
+  def test_min_per_key(self):
+    beam_min.min_per_key(check_elements_with_min_value_per_key)
+
+
+if __name__ == '__main__':
+  unittest.main()