You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/29 23:43:22 UTC

[2/2] incubator-beam git commit: Support ValidatesRunner Attribute in Python

Support ValidatesRunner Attribute in Python

This is roughly equivalent to "RunnableOnService" in the Java SDK. See
BEAM-655


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81e7a0f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81e7a0f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81e7a0f6

Branch: refs/heads/python-sdk
Commit: 81e7a0f653864212a5c9d3d0802608f92bb34501
Parents: 5ce75a2
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Thu Nov 17 14:45:42 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 29 15:43:04 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py | 66 +++++++++++++++--------
 sdks/python/apache_beam/test_pipeline.py | 76 +++++++++++++++++++++++++++
 2 files changed, 120 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index f96e8af..ba3553a 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -24,13 +24,13 @@ import re
 import unittest
 
 import apache_beam as beam
-from apache_beam.pipeline import Pipeline
 from apache_beam.pvalue import AsDict
 from apache_beam.pvalue import AsIter as AllOf
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
 from apache_beam.pvalue import EmptySideInput
 from apache_beam.pvalue import SideOutputValue
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import Create
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import FlatMap
@@ -42,6 +42,7 @@ from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import WindowFn
+from nose.plugins.attrib import attr
 
 
 class DataflowTest(unittest.TestCase):
@@ -58,8 +59,9 @@ class DataflowTest(unittest.TestCase):
             | 'GroupCounts' >> GroupByKey()
             | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
 
+  @attr('ValidatesRunner')
   def test_word_count(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
     result = (
         (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
@@ -67,8 +69,9 @@ class DataflowTest(unittest.TestCase):
     assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_map(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
     result = (lines
               | 'upper' >> Map(str.upper)
@@ -76,8 +79,9 @@ class DataflowTest(unittest.TestCase):
     assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_side_input_as_arg(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     words_list = ['aa', 'bb', 'cc']
     words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
@@ -89,8 +93,9 @@ class DataflowTest(unittest.TestCase):
     assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_side_input_as_keyword_arg(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     words_list = ['aa', 'bb', 'cc']
     words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
@@ -102,6 +107,7 @@ class DataflowTest(unittest.TestCase):
     assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_do_fn_object(self):
     class SomeDoFn(DoFn):
       """A custom DoFn for a FlatMap transform."""
@@ -109,7 +115,7 @@ class DataflowTest(unittest.TestCase):
       def process(self, context, prefix, suffix):
         return ['%s-%s-%s' % (prefix, context.element, suffix)]
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     words_list = ['aa', 'bb', 'cc']
     words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
@@ -119,6 +125,7 @@ class DataflowTest(unittest.TestCase):
     assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_multiple_outputs_and_using_yield(self):
     class SomeDoFn(DoFn):
       """A custom DoFn using yield."""
@@ -130,7 +137,7 @@ class DataflowTest(unittest.TestCase):
         else:
           yield SideOutputValue('odd', context.element)
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | ParDo(
         'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
@@ -139,6 +146,7 @@ class DataflowTest(unittest.TestCase):
     assert_that(results.even, equal_to([2, 4]), label='assert:even')
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_multiple_outputs_and_using_return(self):
     def some_fn(v):
       if v % 2 == 0:
@@ -146,7 +154,7 @@ class DataflowTest(unittest.TestCase):
       else:
         return [v, SideOutputValue('odd', v)]
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | FlatMap(
         'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
@@ -155,8 +163,9 @@ class DataflowTest(unittest.TestCase):
     assert_that(results.even, equal_to([2, 4]), label='assert:even')
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_empty_singleton_side_input(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     pcol = pipeline | 'start' >> Create([1, 2])
     side = pipeline | 'side' >> Create([])  # Empty side input.
 
@@ -167,24 +176,27 @@ class DataflowTest(unittest.TestCase):
     assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_multi_valued_singleton_side_input(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     pcol = pipeline | 'start' >> Create([1, 2])
     side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
     pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side))  # pylint: disable=expression-not-assigned
     with self.assertRaises(ValueError):
       pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_default_value_singleton_side_input(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     pcol = pipeline | 'start' >> Create([1, 2])
     side = pipeline | 'side' >> Create([])  # 0 values in side input.
     result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
     assert_that(result, equal_to([10, 20]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_iterable_side_input(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     pcol = pipeline | 'start' >> Create([1, 2])
     side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
     result = pcol | FlatMap('compute',
@@ -192,8 +204,9 @@ class DataflowTest(unittest.TestCase):
     assert_that(result, equal_to([3, 4, 6, 8]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_undeclared_side_outputs(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | FlatMap(
         'ClassifyNumbers',
@@ -204,8 +217,9 @@ class DataflowTest(unittest.TestCase):
     assert_that(results.even, equal_to([2, 4]), label='assert:even')
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_empty_side_outputs(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
     results = nums | FlatMap(
         'ClassifyNumbers',
@@ -216,10 +230,11 @@ class DataflowTest(unittest.TestCase):
     assert_that(results.even, equal_to([]), label='assert:even')
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_as_list_and_as_dict_side_inputs(self):
     a_list = [5, 1, 3, 2, 9]
     some_pairs = [('crouton', 17), ('supreme', None)]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     main_input = pipeline | 'main input' >> Create([1])
     side_list = pipeline | 'side list' >> Create(a_list)
     side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
@@ -239,11 +254,12 @@ class DataflowTest(unittest.TestCase):
     assert_that(results, matcher(1, a_list, some_pairs))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_as_singleton_without_unique_labels(self):
     # This should succeed as calling AsSingleton on the same PCollection twice
     # with the same defaults will return the same PCollectionView.
     a_list = [2]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     main_input = pipeline | 'main input' >> Create([1])
     side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
@@ -262,11 +278,12 @@ class DataflowTest(unittest.TestCase):
     assert_that(results, matcher(1, 2))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_as_singleton_with_different_defaults_without_unique_labels(self):
     # This should fail as AsSingleton with distinct default values should create
     # distinct PCollectionViews with the same full_label.
     a_list = [2]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     main_input = pipeline | 'main input' >> Create([1])
     side_list = pipeline | 'side list' >> Create(a_list)
 
@@ -280,9 +297,10 @@ class DataflowTest(unittest.TestCase):
             'Transform "ViewAsSingleton(side list.None)" does not have a '
             'stable unique label.'))
 
+  @attr('ValidatesRunner')
   def test_as_singleton_with_different_defaults_with_unique_labels(self):
     a_list = []
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     main_input = pipeline | 'main input' >> Create([1])
     side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
@@ -302,11 +320,12 @@ class DataflowTest(unittest.TestCase):
     assert_that(results, matcher(1, 2, 3))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_as_list_without_unique_labels(self):
     # This should succeed as calling AsList on the same PCollection twice will
     # return the same PCollectionView.
     a_list = [1, 2, 3]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     main_input = pipeline | 'main input' >> Create([1])
     side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
@@ -325,9 +344,10 @@ class DataflowTest(unittest.TestCase):
     assert_that(results, matcher(1, [1, 2, 3]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_as_list_with_unique_labels(self):
     a_list = [1, 2, 3]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     main_input = pipeline | 'main input' >> Create([1])
     side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
@@ -346,9 +366,10 @@ class DataflowTest(unittest.TestCase):
     assert_that(results, matcher(1, [1, 2, 3]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_as_dict_with_unique_labels(self):
     some_kvs = [('a', 1), ('b', 2)]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     main_input = pipeline | 'main input' >> Create([1])
     side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
     results = main_input | FlatMap(
@@ -367,6 +388,7 @@ class DataflowTest(unittest.TestCase):
     assert_that(results, matcher(1, some_kvs))
     pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_window_transform(self):
     class TestWindowFn(WindowFn):
       """Windowing function adding two disjoint windows to each element."""
@@ -378,7 +400,7 @@ class DataflowTest(unittest.TestCase):
       def merge(self, existing_windows):
         return existing_windows
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = TestPipeline()
     numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
     result = (numbers
               | 'W' >> WindowInto(windowfn=TestWindowFn())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
new file mode 100644
index 0000000..be64a7a
--- /dev/null
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test Pipeline, a wrapper of Pipeline for test purpose"""
+
+import argparse
+import shlex
+
+from apache_beam.pipeline import Pipeline
+from apache_beam.utils.options import PipelineOptions
+
+
+class TestPipeline(Pipeline):
+  """TestPipeline class is used inside of Beam tests that can be configured to
+  run against pipeline runner.
+
+  It has a functionality to parse arguments from command line and build pipeline
+  options for tests who runs against a pipeline runner and utilizes resources
+  of the pipeline runner. Those test functions are recommended to be tagged by
+  @attr("ValidatesRunner") annotation.
+
+  In order to configure the test with customized pipeline options from command
+  line, system argument 'test-pipeline-options' can be used to obtains a list
+  of pipeline options. If no options specified, default value will be used.
+
+  For example, use following command line to execute all ValidatesRunner tests::
+
+    python setup.py nosetests -a ValidatesRunner \
+        --test-pipeline-options="--runner=DirectPipelineRunner \
+                                 --job_name=myJobName \
+                                 --num_workers=1"
+
+  For example, use assert_that for test validation::
+
+    pipeline = TestPipeline()
+    pcoll = ...
+    assert_that(pcoll, equal_to(...))
+    pipeline.run()
+  """
+
+  def __init__(self, runner=None, options=None, argv=None):
+    if options is None:
+      options = self.create_pipeline_opt_from_args()
+    super(TestPipeline, self).__init__(runner, options, argv)
+
+  def create_pipeline_opt_from_args(self):
+    """Create a pipeline options from command line argument:
+    --test-pipeline-options
+    """
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--test-pipeline-options',
+                        type=str,
+                        action='store',
+                        help='only run tests providing service options')
+    known, unused_argv = parser.parse_known_args()
+
+    if known.test_pipeline_options:
+      options = shlex.split(known.test_pipeline_options)
+    else:
+      options = []
+
+    return PipelineOptions(options)