You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/29 23:43:22 UTC
[2/2] incubator-beam git commit: Support ValidatesRunner Attribute in
Python
Support ValidatesRunner Attribute in Python
This is roughly equivalent to "RunnableOnService" in the Java SDK. See
BEAM-655
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81e7a0f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81e7a0f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81e7a0f6
Branch: refs/heads/python-sdk
Commit: 81e7a0f653864212a5c9d3d0802608f92bb34501
Parents: 5ce75a2
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Thu Nov 17 14:45:42 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 29 15:43:04 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 66 +++++++++++++++--------
sdks/python/apache_beam/test_pipeline.py | 76 +++++++++++++++++++++++++++
2 files changed, 120 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index f96e8af..ba3553a 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -24,13 +24,13 @@ import re
import unittest
import apache_beam as beam
-from apache_beam.pipeline import Pipeline
from apache_beam.pvalue import AsDict
from apache_beam.pvalue import AsIter as AllOf
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
from apache_beam.pvalue import EmptySideInput
from apache_beam.pvalue import SideOutputValue
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import Create
from apache_beam.transforms import DoFn
from apache_beam.transforms import FlatMap
@@ -42,6 +42,7 @@ from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import WindowFn
+from nose.plugins.attrib import attr
class DataflowTest(unittest.TestCase):
@@ -58,8 +59,9 @@ class DataflowTest(unittest.TestCase):
| 'GroupCounts' >> GroupByKey()
| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
+ @attr('ValidatesRunner')
def test_word_count(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
result = (
(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
@@ -67,8 +69,9 @@ class DataflowTest(unittest.TestCase):
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
pipeline.run()
+ @attr('ValidatesRunner')
def test_map(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
result = (lines
| 'upper' >> Map(str.upper)
@@ -76,8 +79,9 @@ class DataflowTest(unittest.TestCase):
assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
pipeline.run()
+ @attr('ValidatesRunner')
def test_par_do_with_side_input_as_arg(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
words_list = ['aa', 'bb', 'cc']
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in
@@ -89,8 +93,9 @@ class DataflowTest(unittest.TestCase):
assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_par_do_with_side_input_as_keyword_arg(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
words_list = ['aa', 'bb', 'cc']
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
@@ -102,6 +107,7 @@ class DataflowTest(unittest.TestCase):
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_par_do_with_do_fn_object(self):
class SomeDoFn(DoFn):
"""A custom DoFn for a FlatMap transform."""
@@ -109,7 +115,7 @@ class DataflowTest(unittest.TestCase):
def process(self, context, prefix, suffix):
return ['%s-%s-%s' % (prefix, context.element, suffix)]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
words_list = ['aa', 'bb', 'cc']
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
@@ -119,6 +125,7 @@ class DataflowTest(unittest.TestCase):
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_par_do_with_multiple_outputs_and_using_yield(self):
class SomeDoFn(DoFn):
"""A custom DoFn using yield."""
@@ -130,7 +137,7 @@ class DataflowTest(unittest.TestCase):
else:
yield SideOutputValue('odd', context.element)
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | ParDo(
'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
@@ -139,6 +146,7 @@ class DataflowTest(unittest.TestCase):
assert_that(results.even, equal_to([2, 4]), label='assert:even')
pipeline.run()
+ @attr('ValidatesRunner')
def test_par_do_with_multiple_outputs_and_using_return(self):
def some_fn(v):
if v % 2 == 0:
@@ -146,7 +154,7 @@ class DataflowTest(unittest.TestCase):
else:
return [v, SideOutputValue('odd', v)]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
@@ -155,8 +163,9 @@ class DataflowTest(unittest.TestCase):
assert_that(results.even, equal_to([2, 4]), label='assert:even')
pipeline.run()
+ @attr('ValidatesRunner')
def test_empty_singleton_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([]) # Empty side input.
@@ -167,24 +176,27 @@ class DataflowTest(unittest.TestCase):
assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_multi_valued_singleton_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
with self.assertRaises(ValueError):
pipeline.run()
+ @attr('ValidatesRunner')
def test_default_value_singleton_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([]) # 0 values in side input.
result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
assert_that(result, equal_to([10, 20]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_iterable_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
result = pcol | FlatMap('compute',
@@ -192,8 +204,9 @@ class DataflowTest(unittest.TestCase):
assert_that(result, equal_to([3, 4, 6, 8]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_undeclared_side_outputs(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers',
@@ -204,8 +217,9 @@ class DataflowTest(unittest.TestCase):
assert_that(results.even, equal_to([2, 4]), label='assert:even')
pipeline.run()
+ @attr('ValidatesRunner')
def test_empty_side_outputs(self):
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
results = nums | FlatMap(
'ClassifyNumbers',
@@ -216,10 +230,11 @@ class DataflowTest(unittest.TestCase):
assert_that(results.even, equal_to([]), label='assert:even')
pipeline.run()
+ @attr('ValidatesRunner')
def test_as_list_and_as_dict_side_inputs(self):
a_list = [5, 1, 3, 2, 9]
some_pairs = [('crouton', 17), ('supreme', None)]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
@@ -239,11 +254,12 @@ class DataflowTest(unittest.TestCase):
assert_that(results, matcher(1, a_list, some_pairs))
pipeline.run()
+ @attr('ValidatesRunner')
def test_as_singleton_without_unique_labels(self):
# This should succeed as calling AsSingleton on the same PCollection twice
# with the same defaults will return the same PCollectionView.
a_list = [2]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
@@ -262,11 +278,12 @@ class DataflowTest(unittest.TestCase):
assert_that(results, matcher(1, 2))
pipeline.run()
+ @attr('ValidatesRunner')
def test_as_singleton_with_different_defaults_without_unique_labels(self):
# This should fail as AsSingleton with distinct default values should create
# distinct PCollectionViews with the same full_label.
a_list = [2]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
@@ -280,9 +297,10 @@ class DataflowTest(unittest.TestCase):
'Transform "ViewAsSingleton(side list.None)" does not have a '
'stable unique label.'))
+ @attr('ValidatesRunner')
def test_as_singleton_with_different_defaults_with_unique_labels(self):
a_list = []
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
@@ -302,11 +320,12 @@ class DataflowTest(unittest.TestCase):
assert_that(results, matcher(1, 2, 3))
pipeline.run()
+ @attr('ValidatesRunner')
def test_as_list_without_unique_labels(self):
# This should succeed as calling AsList on the same PCollection twice will
# return the same PCollectionView.
a_list = [1, 2, 3]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
@@ -325,9 +344,10 @@ class DataflowTest(unittest.TestCase):
assert_that(results, matcher(1, [1, 2, 3]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_as_list_with_unique_labels(self):
a_list = [1, 2, 3]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
@@ -346,9 +366,10 @@ class DataflowTest(unittest.TestCase):
assert_that(results, matcher(1, [1, 2, 3]))
pipeline.run()
+ @attr('ValidatesRunner')
def test_as_dict_with_unique_labels(self):
some_kvs = [('a', 1), ('b', 2)]
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
main_input = pipeline | 'main input' >> Create([1])
side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
results = main_input | FlatMap(
@@ -367,6 +388,7 @@ class DataflowTest(unittest.TestCase):
assert_that(results, matcher(1, some_kvs))
pipeline.run()
+ @attr('ValidatesRunner')
def test_window_transform(self):
class TestWindowFn(WindowFn):
"""Windowing function adding two disjoint windows to each element."""
@@ -378,7 +400,7 @@ class DataflowTest(unittest.TestCase):
def merge(self, existing_windows):
return existing_windows
- pipeline = Pipeline('DirectPipelineRunner')
+ pipeline = TestPipeline()
numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
result = (numbers
| 'W' >> WindowInto(windowfn=TestWindowFn())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
new file mode 100644
index 0000000..be64a7a
--- /dev/null
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test Pipeline, a wrapper of Pipeline for test purpose"""
+
+import argparse
+import shlex
+
+from apache_beam.pipeline import Pipeline
+from apache_beam.utils.options import PipelineOptions
+
+
+class TestPipeline(Pipeline):
+ """TestPipeline class is used inside of Beam tests that can be configured to
+ run against pipeline runner.
+
+ It has a functionality to parse arguments from command line and build pipeline
+ options for tests who runs against a pipeline runner and utilizes resources
+ of the pipeline runner. Those test functions are recommended to be tagged by
+ @attr("ValidatesRunner") annotation.
+
+ In order to configure the test with customized pipeline options from command
+ line, system argument 'test-pipeline-options' can be used to obtains a list
+ of pipeline options. If no options specified, default value will be used.
+
+ For example, use following command line to execute all ValidatesRunner tests::
+
+ python setup.py nosetests -a ValidatesRunner \
+ --test-pipeline-options="--runner=DirectPipelineRunner \
+ --job_name=myJobName \
+ --num_workers=1"
+
+ For example, use assert_that for test validation::
+
+ pipeline = TestPipeline()
+ pcoll = ...
+ assert_that(pcoll, equal_to(...))
+ pipeline.run()
+ """
+
+ def __init__(self, runner=None, options=None, argv=None):
+ if options is None:
+ options = self.create_pipeline_opt_from_args()
+ super(TestPipeline, self).__init__(runner, options, argv)
+
+ def create_pipeline_opt_from_args(self):
+ """Create a pipeline options from command line argument:
+ --test-pipeline-options
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--test-pipeline-options',
+ type=str,
+ action='store',
+ help='only run tests providing service options')
+ known, unused_argv = parser.parse_known_args()
+
+ if known.test_pipeline_options:
+ options = shlex.split(known.test_pipeline_options)
+ else:
+ options = []
+
+ return PipelineOptions(options)