You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/22 20:39:23 UTC
[1/3] incubator-beam git commit: Add Hamcrest To Tox For
autocomplete_test Execution
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 8d8214148 -> d5c0175ca
Add Hamcrest To Tox For autocomplete_test Execution
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f10cdef4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f10cdef4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f10cdef4
Branch: refs/heads/python-sdk
Commit: f10cdef40c232dd5e09d4dcf0092e165d9d6aa73
Parents: 8d82141
Author: Mark Liu <ma...@google.com>
Authored: Tue Dec 20 14:18:52 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Dec 22 12:38:43 2016 -0800
----------------------------------------------------------------------
sdks/python/tox.ini | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f10cdef4/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 0110273..4e07968 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -27,6 +27,7 @@ select = E3
deps=
pep8
pylint
+ pyhamcrest
commands =
python --version
python apache_beam/examples/complete/autocomplete_test.py
[3/3] incubator-beam git commit: Closes #1639
Posted by ro...@apache.org.
Closes #1639
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d5c0175c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d5c0175c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d5c0175c
Branch: refs/heads/python-sdk
Commit: d5c0175cac6768fcfdb903fd847a8b5a0258badc
Parents: 8d82141 5df8da3
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Dec 22 12:38:45 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Dec 22 12:38:45 2016 -0800
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 43 +++++++++++++
sdks/python/apache_beam/runners/__init__.py | 1 +
sdks/python/apache_beam/runners/runner.py | 10 +++-
sdks/python/apache_beam/runners/runner_test.py | 4 ++
.../python/apache_beam/runners/test/__init__.py | 24 ++++++++
.../runners/test/test_dataflow_runner.py | 38 ++++++++++++
sdks/python/apache_beam/test_pipeline.py | 47 +++++++++++----
sdks/python/apache_beam/test_pipeline_test.py | 63 ++++++++++++++++++++
sdks/python/apache_beam/tests/__init__.py | 16 +++++
.../apache_beam/tests/pipeline_verifiers.py | 50 ++++++++++++++++
.../tests/pipeline_verifiers_test.py | 54 +++++++++++++++++
sdks/python/apache_beam/utils/options.py | 19 ++++++
.../utils/pipeline_options_validator.py | 37 +++++++++++-
.../utils/pipeline_options_validator_test.py | 41 +++++++++++++
sdks/python/setup.cfg | 1 -
sdks/python/tox.ini | 3 +-
16 files changed, 434 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: [BEAM-1112] Python E2E Test
Framework And Wordcount E2E Test
Posted by ro...@apache.org.
[BEAM-1112] Python E2E Test Framework And Wordcount E2E Test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5df8da33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5df8da33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5df8da33
Branch: refs/heads/python-sdk
Commit: 5df8da33137f48c7b80301c1d6b4aad96baf8a37
Parents: f10cdef
Author: Mark Liu <ma...@google.com>
Authored: Thu Dec 15 17:41:20 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Dec 22 12:38:44 2016 -0800
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 43 +++++++++++++
sdks/python/apache_beam/runners/__init__.py | 1 +
sdks/python/apache_beam/runners/runner.py | 10 +++-
sdks/python/apache_beam/runners/runner_test.py | 4 ++
.../python/apache_beam/runners/test/__init__.py | 24 ++++++++
.../runners/test/test_dataflow_runner.py | 38 ++++++++++++
sdks/python/apache_beam/test_pipeline.py | 47 +++++++++++----
sdks/python/apache_beam/test_pipeline_test.py | 63 ++++++++++++++++++++
sdks/python/apache_beam/tests/__init__.py | 16 +++++
.../apache_beam/tests/pipeline_verifiers.py | 50 ++++++++++++++++
.../tests/pipeline_verifiers_test.py | 54 +++++++++++++++++
sdks/python/apache_beam/utils/options.py | 19 ++++++
.../utils/pipeline_options_validator.py | 37 +++++++++++-
.../utils/pipeline_options_validator_test.py | 41 +++++++++++++
sdks/python/setup.cfg | 1 -
sdks/python/tox.ini | 4 +-
16 files changed, 434 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
new file mode 100644
index 0000000..a4e2b16
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the wordcount example."""
+
+import logging
+import unittest
+
+from apache_beam.examples import wordcount
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+from nose.plugins.attrib import attr
+
+
+class WordCountIT(unittest.TestCase):
+
+ @attr('IT')
+ def test_wordcount_it(self):
+ # Set extra options to the pipeline for test purpose
+ extra_opts = {'on_success_matcher': PipelineStateMatcher()}
+
+ # Get pipeline options from command argument: --test-pipeline-options,
+ # and start pipeline job by calling pipeline main function.
+ test_pipeline = TestPipeline()
+ wordcount.run(test_pipeline.get_test_option_args(**extra_opts))
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
index 40ced50..0c2e9c5 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -26,3 +26,4 @@ from apache_beam.runners.direct.direct_runner import EagerPipelineRunner
from apache_beam.runners.runner import create_runner
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
+from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index ec15bee..f138260 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -29,6 +29,7 @@ import tempfile
_KNOWN_DIRECT_RUNNERS = ('DirectPipelineRunner', 'EagerPipelineRunner')
_KNOWN_DATAFLOW_RUNNERS = ('DataflowPipelineRunner',
'BlockingDataflowPipelineRunner')
+_KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
def create_runner(runner_name):
@@ -36,8 +37,8 @@ def create_runner(runner_name):
Args:
runner_name: Name of the pipeline runner. Possible values are:
- DirectPipelineRunner, DataflowPipelineRunner and
- BlockingDataflowPipelineRunner.
+ DirectPipelineRunner, DataflowPipelineRunner,
+ BlockingDataflowPipelineRunner and TestDataflowRunner.
Returns:
A runner object.
@@ -49,6 +50,8 @@ def create_runner(runner_name):
runner_name = 'apache_beam.runners.direct.direct_runner.' + runner_name
elif runner_name in _KNOWN_DATAFLOW_RUNNERS:
runner_name = 'apache_beam.runners.dataflow_runner.' + runner_name
+ elif runner_name in _KNOWN_TEST_RUNNERS:
+ runner_name = 'apache_beam.runners.test.' + runner_name
if '.' in runner_name:
module, runner = runner_name.rsplit('.', 1)
@@ -58,7 +61,8 @@ def create_runner(runner_name):
'Unexpected pipeline runner: %s. Valid values are %s '
'or the fully qualified name of a PipelineRunner subclass.' % (
runner_name,
- ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS)))
+ ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS +
+ _KNOWN_TEST_RUNNERS)))
class PipelineRunner(object):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 0ba42d3..cc9450e 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -33,6 +33,7 @@ from apache_beam.pipeline import Pipeline
from apache_beam.runners import create_runner
from apache_beam.runners import DataflowPipelineRunner
from apache_beam.runners import DirectPipelineRunner
+from apache_beam.runners import TestDataflowRunner
import apache_beam.transforms as ptransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.options import PipelineOptions
@@ -56,6 +57,9 @@ class RunnerTest(unittest.TestCase):
self.assertTrue(
isinstance(create_runner('BlockingDataflowPipelineRunner'),
DataflowPipelineRunner))
+ self.assertTrue(
+ isinstance(create_runner('TestDataflowRunner'),
+ TestDataflowRunner))
self.assertRaises(ValueError, create_runner, 'xyz')
def test_remote_runner_translation(self):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/test/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/__init__.py b/sdks/python/apache_beam/runners/test/__init__.py
new file mode 100644
index 0000000..6fa483b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/test/__init__.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test runner objects that's only for end-to-end tests.
+
+This package defines runners, which are used to execute test pipeline and
+verify results.
+"""
+
+from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
new file mode 100644
index 0000000..aaf5eb8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Wrapper of Beam runners that's built for running and verifying e2e tests."""
+
+from apache_beam.internal import pickler
+from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
+from apache_beam.utils.options import TestOptions
+
+
+class TestDataflowRunner(DataflowPipelineRunner):
+
+ def __init__(self):
+ super(TestDataflowRunner, self).__init__(blocking=True)
+
+ def run(self, pipeline):
+ """Execute test pipeline and verify test matcher"""
+ self.result = super(TestDataflowRunner, self).run(pipeline)
+
+ options = pipeline.options.view_as(TestOptions)
+ if options.on_success_matcher:
+ from hamcrest import assert_that as hc_assert_that
+ hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
+ return self.result
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index be64a7a..5794380 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -20,6 +20,7 @@
import argparse
import shlex
+from apache_beam.internal import pickler
from apache_beam.pipeline import Pipeline
from apache_beam.utils.options import PipelineOptions
@@ -54,23 +55,49 @@ class TestPipeline(Pipeline):
def __init__(self, runner=None, options=None, argv=None):
if options is None:
- options = self.create_pipeline_opt_from_args()
+ options = PipelineOptions(self.get_test_option_args())
super(TestPipeline, self).__init__(runner, options, argv)
- def create_pipeline_opt_from_args(self):
- """Create a pipeline options from command line argument:
- --test-pipeline-options
+ def _append_extra_opts(self, opt_list, extra_opts):
+ """Append extra pipeline options to existing option list.
+
+ Test verifier (if contains) should be pickled before append, and
+ will be unpickled later in TestRunner.
+ """
+ for k, v in extra_opts.items():
+ if not v:
+ continue
+ elif isinstance(v, bool) and v:
+ opt_list.append('--%s' % k)
+ elif 'matcher' in k:
+ opt_list.append('--%s=%s' % (k, pickler.dumps(v)))
+ else:
+ opt_list.append('--%s=%s' % (k, v))
+
+ def get_test_option_args(self, argv=None, **kwargs):
+ """Get pipeline options as argument list by parsing value of command line
+ argument: --test-pipeline-options combined with given extra options.
+
+ Args:
+ argv: An iterable of command line arguments to be used. If not specified
+ then sys.argv will be used as input for parsing arguments.
+ kwargs: Extra pipeline options for the test.
+
+ Returns:
+ An argument list of options that can be parsed by argparser or directly
+ build a pipeline option.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--test-pipeline-options',
type=str,
action='store',
help='only run tests providing service options')
- known, unused_argv = parser.parse_known_args()
+ known, unused_argv = parser.parse_known_args(argv)
+
+ options_list = shlex.split(known.test_pipeline_options) \
+ if known.test_pipeline_options else []
- if known.test_pipeline_options:
- options = shlex.split(known.test_pipeline_options)
- else:
- options = []
+ if kwargs:
+ self._append_extra_opts(options_list, kwargs)
- return PipelineOptions(options)
+ return options_list
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
new file mode 100644
index 0000000..747f0ef
--- /dev/null
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit test for the TestPipeline class"""
+
+import unittest
+
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.utils.options import PipelineOptions
+
+
+class TestPipelineTest(unittest.TestCase):
+
+ TEST_CASE = {'options':
+ ['--test-pipeline-options', '--job=mockJob --male --age=1'],
+ 'expected_list': ['--job=mockJob', '--male', '--age=1'],
+ 'expected_dict': {'job': 'mockJob',
+ 'male': True,
+ 'age': 1}}
+
+ def setUp(self):
+ self.pipeline = TestPipeline()
+
+ # Used for testing pipeline option creation.
+ class TestParsingOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--job', action='store', help='mock job')
+ parser.add_argument('--male', action='store_true', help='mock gender')
+ parser.add_argument('--age', action='store', type=int, help='mock age')
+
+ def test_option_args_parsing(self):
+ self.assertListEqual(
+ self.pipeline.get_test_option_args(argv=self.TEST_CASE['options']),
+ self.TEST_CASE['expected_list'])
+
+ def test_create_test_pipeline_options(self):
+ test_options = PipelineOptions(
+ self.pipeline.get_test_option_args(self.TEST_CASE['options']))
+ self.assertDictContainsSubset(
+ self.TEST_CASE['expected_dict'], test_options.get_all_options())
+
+ def test_append_extra_options(self):
+ extra_opt = {'name': 'Mark'}
+ options_list = self.pipeline.get_test_option_args(
+ argv=self.TEST_CASE['options'], **extra_opt)
+ expected_list = self.TEST_CASE['expected_list'] + ['--name=Mark']
+ self.assertListEqual(expected_list, options_list)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/tests/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/__init__.py b/sdks/python/apache_beam/tests/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/tests/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
new file mode 100644
index 0000000..1a6dd45
--- /dev/null
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test result verifiers
+
+A set of verifiers that are used in end-to-end tests to verify state/output
+of test pipeline job. Customized verifier should extend
+`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
+"""
+
+from apache_beam.runners.runner import PipelineState
+from hamcrest.core.base_matcher import BaseMatcher
+
+
+class PipelineStateMatcher(BaseMatcher):
+ """Matcher that verify pipeline job terminated in expected state
+
+ Matcher compares the actual pipeline terminate state with expected.
+ By default, `PipelineState.DONE` is used as expected state.
+ """
+
+ def __init__(self, expected_state=PipelineState.DONE):
+ self.expected_state = expected_state
+
+ def _matches(self, pipeline_result):
+ return pipeline_result.current_state() == self.expected_state
+
+ def describe_to(self, description):
+ description \
+ .append_text("Test pipeline expected terminated in state: ") \
+ .append_text(self.expected_state)
+
+ def describe_mismatch(self, pipeline_result, mismatch_description):
+ mismatch_description \
+ .append_text("Test pipeline job terminated in state: ") \
+ .append_text(pipeline_result.current_state())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
new file mode 100644
index 0000000..e32f603
--- /dev/null
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test pipeline verifiers"""
+
+import logging
+import unittest
+
+from apache_beam.internal.clients import dataflow
+from apache_beam.runners.runner import PipelineState
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+from hamcrest import assert_that as hc_assert_that
+
+
+class PipelineVerifiersTest(unittest.TestCase):
+
+ def test_dataflow_job_state_matcher_success(self):
+ """Test DataflowJobStateMatcher successes when job finished in DONE"""
+ pipeline_result = PipelineResult(PipelineState.DONE)
+ hc_assert_that(pipeline_result, PipelineStateMatcher())
+
+ def test_pipeline_state_matcher_fails(self):
+ """Test DataflowJobStateMatcher fails when job finished in
+ FAILED/CANCELLED/STOPPED/UNKNOWN/DRAINED"""
+ job_enum = dataflow.Job.CurrentStateValueValuesEnum
+ failed_state = [job_enum.JOB_STATE_FAILED,
+ job_enum.JOB_STATE_CANCELLED,
+ job_enum.JOB_STATE_STOPPED,
+ job_enum.JOB_STATE_UNKNOWN,
+ job_enum.JOB_STATE_DRAINED]
+
+ for state in failed_state:
+ pipeline_result = PipelineResult(state)
+ with self.assertRaises(AssertionError):
+ hc_assert_that(pipeline_result, PipelineStateMatcher())
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index e4ef10b..5ebb4b4 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -479,6 +479,25 @@ class SetupOptions(PipelineOptions):
'(--staging_location option) and the workers will install them in '
'same order they were specified on the command line.'))
+
+class TestOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # Options for e2e test pipeline.
+ parser.add_argument(
+ '--on_success_matcher',
+ default=None,
+ help=('Verify state/output of e2e test pipeline. This is pickled '
+ 'version of the matcher which should extends '
+ 'hamcrest.core.base_matcher.BaseMatcher.'))
+
+ def validate(self, validator):
+ errors = []
+ if self.view_as(TestOptions).on_success_matcher:
+ errors.extend(validator.validate_test_matcher(self, 'on_success_matcher'))
+ return errors
+
# TODO(silviuc): Add --files_to_stage option.
# This could potentially replace the --requirements_file and --setup_file.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index c248022..ab42b65 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -19,10 +19,12 @@
"""
import re
+from apache_beam.internal import pickler
from apache_beam.utils.options import DebugOptions
from apache_beam.utils.options import GoogleCloudOptions
from apache_beam.utils.options import SetupOptions
from apache_beam.utils.options import StandardOptions
+from apache_beam.utils.options import TestOptions
from apache_beam.utils.options import TypeOptions
from apache_beam.utils.options import WorkerOptions
@@ -40,7 +42,7 @@ class PipelineOptionsValidator(object):
# Validator will call validate on these subclasses of PipelineOptions
OPTIONS = [DebugOptions, GoogleCloudOptions, SetupOptions, StandardOptions,
- TypeOptions, WorkerOptions]
+ TypeOptions, WorkerOptions, TestOptions]
# Possible validation errors.
ERR_MISSING_OPTION = 'Missing required option: %s.'
@@ -62,6 +64,12 @@ class PipelineOptionsValidator(object):
'not project description.')
ERR_INVALID_NOT_POSITIVE = ('Invalid value (%s) for option: %s. Value needs '
'to be positive.')
+ ERR_INVALID_TEST_MATCHER_TYPE = (
+ 'Invalid value (%s) for option: %s. Please extend your matcher object '
+ 'from hamcrest.core.base_matcher.BaseMatcher.')
+ ERR_INVALID_TEST_MATCHER_UNPICKLABLE = (
+ 'Invalid value (%s) for option: %s. Please make sure the test matcher '
+ 'is unpicklable.')
# GCS path specific patterns.
GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
@@ -98,7 +106,8 @@ class PipelineOptionsValidator(object):
is_service_runner = (self.runner is not None and
type(self.runner).__name__ in [
'DataflowPipelineRunner',
- 'BlockingDataflowPipelineRunner'])
+ 'BlockingDataflowPipelineRunner',
+ 'TestDataflowRunner'])
dataflow_endpoint = (
self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
@@ -165,3 +174,27 @@ class PipelineOptionsValidator(object):
if arg is not None and int(arg) <= 0:
return self._validate_error(self.ERR_INVALID_NOT_POSITIVE, arg, arg_name)
return []
+
+ def validate_test_matcher(self, view, arg_name):
+ """Validates that on_success_matcher argument if set.
+
+ Validates that on_success_matcher is unpicklable and is instance
+ of `hamcrest.core.base_matcher.BaseMatcher`.
+ """
+ # This is a test only method and requires hamcrest
+ from hamcrest.core.base_matcher import BaseMatcher
+ pickled_matcher = view.on_success_matcher
+ errors = []
+ try:
+ matcher = pickler.loads(pickled_matcher)
+ if not isinstance(matcher, BaseMatcher):
+ errors.extend(
+ self._validate_error(
+ self.ERR_INVALID_TEST_MATCHER_TYPE, matcher, arg_name))
+ except: # pylint: disable=bare-except
+ errors.extend(
+ self._validate_error(
+ self.ERR_INVALID_TEST_MATCHER_UNPICKLABLE,
+ pickled_matcher,
+ arg_name))
+ return errors
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index 5e93ff6..49028c7 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -20,8 +20,10 @@
import logging
import unittest
+from apache_beam.internal import pickler
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator
+from hamcrest.core.base_matcher import BaseMatcher
# Mock runners to use for validations.
@@ -30,10 +32,20 @@ class MockRunners(object):
class DataflowPipelineRunner(object):
pass
+ class TestDataflowRunner(object):
+ pass
+
class OtherRunner(object):
pass
+# Matcher that always passes for testing on_success_matcher option
+class AlwaysPassMatcher(BaseMatcher):
+
+ def _matches(self, item):
+ return True
+
+
class SetupTest(unittest.TestCase):
def check_errors_for_arguments(self, errors, args):
@@ -288,6 +300,35 @@ class SetupTest(unittest.TestCase):
errors = validator.validate()
self.assertFalse(errors)
+ def test_test_matcher(self):
+ def get_validator(matcher):
+ options = ['--project=example:example',
+ '--job_name=job',
+ '--staging_location=gs://foo/bar',
+ '--temp_location=gs://foo/bar',]
+ if matcher:
+ options.append('--on_success_matcher=' + matcher)
+
+ pipeline_options = PipelineOptions(options)
+ runner = MockRunners.TestDataflowRunner()
+ return PipelineOptionsValidator(pipeline_options, runner)
+
+ test_case = [
+ {'on_success_matcher': None,
+ 'errors': []},
+ {'on_success_matcher': pickler.dumps(AlwaysPassMatcher()),
+ 'errors': []},
+ {'on_success_matcher': 'abc',
+ 'errors': ['on_success_matcher']},
+ {'on_success_matcher': pickler.dumps(object),
+ 'errors': ['on_success_matcher']},
+ ]
+
+ for case in test_case:
+ errors = get_validator(case['on_success_matcher']).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, case['errors']), [])
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/setup.cfg
----------------------------------------------------------------------
diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg
index 547a74b..ce8c89e 100644
--- a/sdks/python/setup.cfg
+++ b/sdks/python/setup.cfg
@@ -25,4 +25,3 @@ verbosity=2
# fast_coders module which is not available when running unit tests:
# fast_coders_test and typecoders_test.
exclude=fast_coders_test|typecoders_test
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 4e07968..4a8a3be 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -27,10 +27,10 @@ select = E3
deps=
pep8
pylint
- pyhamcrest
commands =
python --version
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py test
+ # Run unit tests only. Skip integration tests which have attribute 'IT'
+ python setup.py nosetests -a '!IT'
{toxinidir}/run_pylint.sh
passenv = TRAVIS*