You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/22 20:39:24 UTC

[2/3] incubator-beam git commit: [BEAM-1112] Python E2E Test Framework And Wordcount E2E Test

[BEAM-1112] Python E2E Test Framework And Wordcount E2E Test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5df8da33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5df8da33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5df8da33

Branch: refs/heads/python-sdk
Commit: 5df8da33137f48c7b80301c1d6b4aad96baf8a37
Parents: f10cdef
Author: Mark Liu <ma...@google.com>
Authored: Thu Dec 15 17:41:20 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Dec 22 12:38:44 2016 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_it_test.py   | 43 +++++++++++++
 sdks/python/apache_beam/runners/__init__.py     |  1 +
 sdks/python/apache_beam/runners/runner.py       | 10 +++-
 sdks/python/apache_beam/runners/runner_test.py  |  4 ++
 .../python/apache_beam/runners/test/__init__.py | 24 ++++++++
 .../runners/test/test_dataflow_runner.py        | 38 ++++++++++++
 sdks/python/apache_beam/test_pipeline.py        | 47 +++++++++++----
 sdks/python/apache_beam/test_pipeline_test.py   | 63 ++++++++++++++++++++
 sdks/python/apache_beam/tests/__init__.py       | 16 +++++
 .../apache_beam/tests/pipeline_verifiers.py     | 50 ++++++++++++++++
 .../tests/pipeline_verifiers_test.py            | 54 +++++++++++++++++
 sdks/python/apache_beam/utils/options.py        | 19 ++++++
 .../utils/pipeline_options_validator.py         | 37 +++++++++++-
 .../utils/pipeline_options_validator_test.py    | 41 +++++++++++++
 sdks/python/setup.cfg                           |  1 -
 sdks/python/tox.ini                             |  4 +-
 16 files changed, 434 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
new file mode 100644
index 0000000..a4e2b16
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the wordcount example."""
+
+import logging
+import unittest
+
+from apache_beam.examples import wordcount
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+from nose.plugins.attrib import attr
+
+
+class WordCountIT(unittest.TestCase):
+
+  @attr('IT')
+  def test_wordcount_it(self):
+    # Set extra options to the pipeline for test purpose
+    extra_opts = {'on_success_matcher': PipelineStateMatcher()}
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    test_pipeline = TestPipeline()
+    wordcount.run(test_pipeline.get_test_option_args(**extra_opts))
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
index 40ced50..0c2e9c5 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -26,3 +26,4 @@ from apache_beam.runners.direct.direct_runner import EagerPipelineRunner
 from apache_beam.runners.runner import create_runner
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
+from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index ec15bee..f138260 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -29,6 +29,7 @@ import tempfile
 _KNOWN_DIRECT_RUNNERS = ('DirectPipelineRunner', 'EagerPipelineRunner')
 _KNOWN_DATAFLOW_RUNNERS = ('DataflowPipelineRunner',
                            'BlockingDataflowPipelineRunner')
+_KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
 
 
 def create_runner(runner_name):
@@ -36,8 +37,8 @@ def create_runner(runner_name):
 
   Args:
     runner_name: Name of the pipeline runner. Possible values are:
-      DirectPipelineRunner, DataflowPipelineRunner and
-      BlockingDataflowPipelineRunner.
+      DirectPipelineRunner, DataflowPipelineRunner,
+      BlockingDataflowPipelineRunner and TestDataflowRunner.
 
   Returns:
     A runner object.
@@ -49,6 +50,8 @@ def create_runner(runner_name):
     runner_name = 'apache_beam.runners.direct.direct_runner.' + runner_name
   elif runner_name in _KNOWN_DATAFLOW_RUNNERS:
     runner_name = 'apache_beam.runners.dataflow_runner.' + runner_name
+  elif runner_name in _KNOWN_TEST_RUNNERS:
+    runner_name = 'apache_beam.runners.test.' + runner_name
 
   if '.' in runner_name:
     module, runner = runner_name.rsplit('.', 1)
@@ -58,7 +61,8 @@ def create_runner(runner_name):
         'Unexpected pipeline runner: %s. Valid values are %s '
         'or the fully qualified name of a PipelineRunner subclass.' % (
             runner_name,
-            ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS)))
+            ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS +
+                      _KNOWN_TEST_RUNNERS)))
 
 
 class PipelineRunner(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 0ba42d3..cc9450e 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -33,6 +33,7 @@ from apache_beam.pipeline import Pipeline
 from apache_beam.runners import create_runner
 from apache_beam.runners import DataflowPipelineRunner
 from apache_beam.runners import DirectPipelineRunner
+from apache_beam.runners import TestDataflowRunner
 import apache_beam.transforms as ptransform
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.options import PipelineOptions
@@ -56,6 +57,9 @@ class RunnerTest(unittest.TestCase):
     self.assertTrue(
         isinstance(create_runner('BlockingDataflowPipelineRunner'),
                    DataflowPipelineRunner))
+    self.assertTrue(
+        isinstance(create_runner('TestDataflowRunner'),
+                   TestDataflowRunner))
     self.assertRaises(ValueError, create_runner, 'xyz')
 
   def test_remote_runner_translation(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/test/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/__init__.py b/sdks/python/apache_beam/runners/test/__init__.py
new file mode 100644
index 0000000..6fa483b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/test/__init__.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test runner objects that's only for end-to-end tests.
+
+This package defines runners, which are used to execute test pipeline and
+verify results.
+"""
+
+from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
new file mode 100644
index 0000000..aaf5eb8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Wrapper of Beam runners that's built for running and verifying e2e tests."""
+
+from apache_beam.internal import pickler
+from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
+from apache_beam.utils.options import TestOptions
+
+
+class TestDataflowRunner(DataflowPipelineRunner):
+
+  def __init__(self):
+    super(TestDataflowRunner, self).__init__(blocking=True)
+
+  def run(self, pipeline):
+    """Execute test pipeline and verify test matcher"""
+    self.result = super(TestDataflowRunner, self).run(pipeline)
+
+    options = pipeline.options.view_as(TestOptions)
+    if options.on_success_matcher:
+      from hamcrest import assert_that as hc_assert_that
+      hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
+    return self.result

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index be64a7a..5794380 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -20,6 +20,7 @@
 import argparse
 import shlex
 
+from apache_beam.internal import pickler
 from apache_beam.pipeline import Pipeline
 from apache_beam.utils.options import PipelineOptions
 
@@ -54,23 +55,49 @@ class TestPipeline(Pipeline):
 
   def __init__(self, runner=None, options=None, argv=None):
     if options is None:
-      options = self.create_pipeline_opt_from_args()
+      options = PipelineOptions(self.get_test_option_args())
     super(TestPipeline, self).__init__(runner, options, argv)
 
-  def create_pipeline_opt_from_args(self):
-    """Create a pipeline options from command line argument:
-    --test-pipeline-options
+  def _append_extra_opts(self, opt_list, extra_opts):
+    """Append extra pipeline options to existing option list.
+
+    Test verifier (if contains) should be pickled before append, and
+    will be unpickled later in TestRunner.
+    """
+    for k, v in extra_opts.items():
+      if not v:
+        continue
+      elif isinstance(v, bool) and v:
+        opt_list.append('--%s' % k)
+      elif 'matcher' in k:
+        opt_list.append('--%s=%s' % (k, pickler.dumps(v)))
+      else:
+        opt_list.append('--%s=%s' % (k, v))
+
+  def get_test_option_args(self, argv=None, **kwargs):
+    """Get pipeline options as argument list by parsing value of command line
+    argument: --test-pipeline-options combined with given extra options.
+
+    Args:
+      argv: An iterable of command line arguments to be used. If not specified
+        then sys.argv will be used as input for parsing arguments.
+      kwargs: Extra pipeline options for the test.
+
+    Returns:
+      An argument list of options that can be parsed by argparser or directly
+      build a pipeline option.
     """
     parser = argparse.ArgumentParser()
     parser.add_argument('--test-pipeline-options',
                         type=str,
                         action='store',
                         help='only run tests providing service options')
-    known, unused_argv = parser.parse_known_args()
+    known, unused_argv = parser.parse_known_args(argv)
+
+    options_list = shlex.split(known.test_pipeline_options) \
+      if known.test_pipeline_options else []
 
-    if known.test_pipeline_options:
-      options = shlex.split(known.test_pipeline_options)
-    else:
-      options = []
+    if kwargs:
+      self._append_extra_opts(options_list, kwargs)
 
-    return PipelineOptions(options)
+    return options_list

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
new file mode 100644
index 0000000..747f0ef
--- /dev/null
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit test for the TestPipeline class"""
+
+import unittest
+
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.utils.options import PipelineOptions
+
+
+class TestPipelineTest(unittest.TestCase):
+
+  TEST_CASE = {'options':
+                   ['--test-pipeline-options', '--job=mockJob --male --age=1'],
+               'expected_list': ['--job=mockJob', '--male', '--age=1'],
+               'expected_dict': {'job': 'mockJob',
+                                 'male': True,
+                                 'age': 1}}
+
+  def setUp(self):
+    self.pipeline = TestPipeline()
+
+  # Used for testing pipeline option creation.
+  class TestParsingOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--job', action='store', help='mock job')
+      parser.add_argument('--male', action='store_true', help='mock gender')
+      parser.add_argument('--age', action='store', type=int, help='mock age')
+
+  def test_option_args_parsing(self):
+    self.assertListEqual(
+        self.pipeline.get_test_option_args(argv=self.TEST_CASE['options']),
+        self.TEST_CASE['expected_list'])
+
+  def test_create_test_pipeline_options(self):
+    test_options = PipelineOptions(
+        self.pipeline.get_test_option_args(self.TEST_CASE['options']))
+    self.assertDictContainsSubset(
+        self.TEST_CASE['expected_dict'], test_options.get_all_options())
+
+  def test_append_extra_options(self):
+    extra_opt = {'name': 'Mark'}
+    options_list = self.pipeline.get_test_option_args(
+        argv=self.TEST_CASE['options'], **extra_opt)
+    expected_list = self.TEST_CASE['expected_list'] + ['--name=Mark']
+    self.assertListEqual(expected_list, options_list)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/tests/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/__init__.py b/sdks/python/apache_beam/tests/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/tests/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
new file mode 100644
index 0000000..1a6dd45
--- /dev/null
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test result verifiers
+
+A set of verifiers that are used in end-to-end tests to verify state/output
+of test pipeline job. Customized verifier should extend
+`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
+"""
+
+from apache_beam.runners.runner import PipelineState
+from hamcrest.core.base_matcher import BaseMatcher
+
+
+class PipelineStateMatcher(BaseMatcher):
+  """Matcher that verify pipeline job terminated in expected state
+
+  Matcher compares the actual pipeline terminate state with expected.
+  By default, `PipelineState.DONE` is used as expected state.
+  """
+
+  def __init__(self, expected_state=PipelineState.DONE):
+    self.expected_state = expected_state
+
+  def _matches(self, pipeline_result):
+    return pipeline_result.current_state() == self.expected_state
+
+  def describe_to(self, description):
+    description \
+      .append_text("Test pipeline expected terminated in state: ") \
+      .append_text(self.expected_state)
+
+  def describe_mismatch(self, pipeline_result, mismatch_description):
+    mismatch_description \
+      .append_text("Test pipeline job terminated in state: ") \
+      .append_text(pipeline_result.current_state())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
new file mode 100644
index 0000000..e32f603
--- /dev/null
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test pipeline verifiers"""
+
+import logging
+import unittest
+
+from apache_beam.internal.clients import dataflow
+from apache_beam.runners.runner import PipelineState
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+from hamcrest import assert_that as hc_assert_that
+
+
+class PipelineVerifiersTest(unittest.TestCase):
+
+  def test_dataflow_job_state_matcher_success(self):
+    """Test DataflowJobStateMatcher successes when job finished in DONE"""
+    pipeline_result = PipelineResult(PipelineState.DONE)
+    hc_assert_that(pipeline_result, PipelineStateMatcher())
+
+  def test_pipeline_state_matcher_fails(self):
+    """Test DataflowJobStateMatcher fails when job finished in
+    FAILED/CANCELLED/STOPPED/UNKNOWN/DRAINED"""
+    job_enum = dataflow.Job.CurrentStateValueValuesEnum
+    failed_state = [job_enum.JOB_STATE_FAILED,
+                    job_enum.JOB_STATE_CANCELLED,
+                    job_enum.JOB_STATE_STOPPED,
+                    job_enum.JOB_STATE_UNKNOWN,
+                    job_enum.JOB_STATE_DRAINED]
+
+    for state in failed_state:
+      pipeline_result = PipelineResult(state)
+      with self.assertRaises(AssertionError):
+        hc_assert_that(pipeline_result, PipelineStateMatcher())
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index e4ef10b..5ebb4b4 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -479,6 +479,25 @@ class SetupOptions(PipelineOptions):
          '(--staging_location option) and the workers will install them in '
          'same order they were specified on the command line.'))
 
+
+class TestOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Options for e2e test pipeline.
+    parser.add_argument(
+        '--on_success_matcher',
+        default=None,
+        help=('Verify state/output of e2e test pipeline. This is pickled '
+              'version of the matcher which should extends '
+              'hamcrest.core.base_matcher.BaseMatcher.'))
+
+  def validate(self, validator):
+    errors = []
+    if self.view_as(TestOptions).on_success_matcher:
+      errors.extend(validator.validate_test_matcher(self, 'on_success_matcher'))
+    return errors
+
 # TODO(silviuc): Add --files_to_stage option.
 # This could potentially replace the --requirements_file and --setup_file.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index c248022..ab42b65 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -19,10 +19,12 @@
 """
 import re
 
+from apache_beam.internal import pickler
 from apache_beam.utils.options import DebugOptions
 from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import SetupOptions
 from apache_beam.utils.options import StandardOptions
+from apache_beam.utils.options import TestOptions
 from apache_beam.utils.options import TypeOptions
 from apache_beam.utils.options import WorkerOptions
 
@@ -40,7 +42,7 @@ class PipelineOptionsValidator(object):
 
   # Validator will call validate on these subclasses of PipelineOptions
   OPTIONS = [DebugOptions, GoogleCloudOptions, SetupOptions, StandardOptions,
-             TypeOptions, WorkerOptions]
+             TypeOptions, WorkerOptions, TestOptions]
 
   # Possible validation errors.
   ERR_MISSING_OPTION = 'Missing required option: %s.'
@@ -62,6 +64,12 @@ class PipelineOptionsValidator(object):
       'not project description.')
   ERR_INVALID_NOT_POSITIVE = ('Invalid value (%s) for option: %s. Value needs '
                               'to be positive.')
+  ERR_INVALID_TEST_MATCHER_TYPE = (
+      'Invalid value (%s) for option: %s. Please extend your matcher object '
+      'from hamcrest.core.base_matcher.BaseMatcher.')
+  ERR_INVALID_TEST_MATCHER_UNPICKLABLE = (
+      'Invalid value (%s) for option: %s. Please make sure the test matcher '
+      'is unpicklable.')
 
   # GCS path specific patterns.
   GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
@@ -98,7 +106,8 @@ class PipelineOptionsValidator(object):
     is_service_runner = (self.runner is not None and
                          type(self.runner).__name__ in [
                              'DataflowPipelineRunner',
-                             'BlockingDataflowPipelineRunner'])
+                             'BlockingDataflowPipelineRunner',
+                             'TestDataflowRunner'])
 
     dataflow_endpoint = (
         self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
@@ -165,3 +174,27 @@ class PipelineOptionsValidator(object):
     if arg is not None and int(arg) <= 0:
       return self._validate_error(self.ERR_INVALID_NOT_POSITIVE, arg, arg_name)
     return []
+
+  def validate_test_matcher(self, view, arg_name):
+    """Validates that on_success_matcher argument if set.
+
+    Validates that on_success_matcher is unpicklable and is instance
+    of `hamcrest.core.base_matcher.BaseMatcher`.
+    """
+    # This is a test only method and requires hamcrest
+    from hamcrest.core.base_matcher import BaseMatcher
+    pickled_matcher = view.on_success_matcher
+    errors = []
+    try:
+      matcher = pickler.loads(pickled_matcher)
+      if not isinstance(matcher, BaseMatcher):
+        errors.extend(
+            self._validate_error(
+                self.ERR_INVALID_TEST_MATCHER_TYPE, matcher, arg_name))
+    except:   # pylint: disable=bare-except
+      errors.extend(
+          self._validate_error(
+              self.ERR_INVALID_TEST_MATCHER_UNPICKLABLE,
+              pickled_matcher,
+              arg_name))
+    return errors

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index 5e93ff6..49028c7 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -20,8 +20,10 @@
 import logging
 import unittest
 
+from apache_beam.internal import pickler
 from apache_beam.utils.options import PipelineOptions
 from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator
+from hamcrest.core.base_matcher import BaseMatcher
 
 
 # Mock runners to use for validations.
@@ -30,10 +32,20 @@ class MockRunners(object):
   class DataflowPipelineRunner(object):
     pass
 
+  class TestDataflowRunner(object):
+    pass
+
   class OtherRunner(object):
     pass
 
 
+# Matcher that always passes for testing on_success_matcher option
+class AlwaysPassMatcher(BaseMatcher):
+
+  def _matches(self, item):
+    return True
+
+
 class SetupTest(unittest.TestCase):
 
   def check_errors_for_arguments(self, errors, args):
@@ -288,6 +300,35 @@ class SetupTest(unittest.TestCase):
     errors = validator.validate()
     self.assertFalse(errors)
 
+  def test_test_matcher(self):
+    def get_validator(matcher):
+      options = ['--project=example:example',
+                 '--job_name=job',
+                 '--staging_location=gs://foo/bar',
+                 '--temp_location=gs://foo/bar',]
+      if matcher:
+        options.append('--on_success_matcher=' + matcher)
+
+      pipeline_options = PipelineOptions(options)
+      runner = MockRunners.TestDataflowRunner()
+      return PipelineOptionsValidator(pipeline_options, runner)
+
+    test_case = [
+        {'on_success_matcher': None,
+         'errors': []},
+        {'on_success_matcher': pickler.dumps(AlwaysPassMatcher()),
+         'errors': []},
+        {'on_success_matcher': 'abc',
+         'errors': ['on_success_matcher']},
+        {'on_success_matcher': pickler.dumps(object),
+         'errors': ['on_success_matcher']},
+    ]
+
+    for case in test_case:
+      errors = get_validator(case['on_success_matcher']).validate()
+      self.assertEqual(
+          self.check_errors_for_arguments(errors, case['errors']), [])
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/setup.cfg
----------------------------------------------------------------------
diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg
index 547a74b..ce8c89e 100644
--- a/sdks/python/setup.cfg
+++ b/sdks/python/setup.cfg
@@ -25,4 +25,3 @@ verbosity=2
 # fast_coders module which is not available when running unit tests:
 # fast_coders_test and typecoders_test.
 exclude=fast_coders_test|typecoders_test
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5df8da33/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 4e07968..4a8a3be 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -27,10 +27,10 @@ select = E3
 deps=
   pep8
   pylint
-  pyhamcrest
 commands =
   python --version
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py test
+  # Run unit tests only. Skip integration tests which have attribute 'IT'
+  python setup.py nosetests -a '!IT'
   {toxinidir}/run_pylint.sh
 passenv = TRAVIS*