You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/10 03:09:29 UTC

[1/3] beam git commit: [BEAM-2236] Cherry pick #3017 - Move test utilities out of python core

Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 aa8c9d10f -> 0cc8b9ed7


http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/data/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml b/sdks/python/apache_beam/tests/data/standard_coders.yaml
deleted file mode 100644
index 790cacb..0000000
--- a/sdks/python/apache_beam/tests/data/standard_coders.yaml
+++ /dev/null
@@ -1,196 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This file is broken into multiple sections delimited by ---. Each section specifies a set of
-# reference encodings for a single standardized coder used in a specific context.
-#
-# Each section contains up to 3 properties:
-#
-#   coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
-#   nested: a boolean meaning whether the coder was used in the nested context. Missing means to
-#           test both contexts, a shorthand for when the coder is invariant across context.
-#   examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context.
-#             The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is
-#             one of a few standard JSON types such as numbers, strings, dicts that map naturally
-#             to the type encoded by the coder.
-#
-# These choices were made to strike a balance between portability, ease of use, and simple
-# legibility of this file itself.
-#
-# It is expected that future work will move the `coder` field into a format that it would be
-# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
-#
-# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated.
-
-
-
-coder:
-  urn: "urn:beam:coders:bytes:0.1"
-nested: false
-examples:
-  "abc": abc
-  "ab\0c": "ab\0c"
-
----
-
-coder:
-  urn: "urn:beam:coders:bytes:0.1"
-nested: true
-examples:
-  "\u0003abc": abc
-  "\u0004ab\0c": "ab\0c"
-  "\u00c8\u0001       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|":
-              "       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|"
-
----
-
-coder:
-  urn: "urn:beam:coders:varint:0.1"
-examples:
-  "\0": 0
-  "\u0001": 1
-  "\u000A": 10
-  "\u00c8\u0001": 200
-  "\u00e8\u0007": 1000
-  "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:varint:0.1"}]
-examples:
-  "\u0003abc\0": {key: abc, value: 0}
-  "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:bytes:0.1"}]
-nested: false
-examples:
-  "\u0003abcdef": {key: abc, value: def}
-  "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:bytes:0.1"}]
-nested: true
-examples:
-  "\u0003abc\u0003def": {key: abc, value: def}
-  "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
-  urn: "urn:beam:coders:interval_window:0.1"
-examples:
-  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
-  "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
-  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365}
-  "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"}]
-examples:
-  "\0\0\0\u0001\0": [0]
-  "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
-  "\0\0\0\0": []
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"}]
-examples:
-  "\0\0\0\u0001\u0003abc": ["abc"]
-  "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
-  "\0\0\0\0": []
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"}]
-  # This is for iterables of unknown length, where the encoding is not
-  # deterministic.
-  non_deterministic: True
-examples:
-  "\u00ff\u00ff\u00ff\u00ff\u0000": []
-  "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
-  "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:global_window:0.1"}]
-examples:
-  "\0\0\0\u0001": [""]
-
----
-
-coder:
-  urn: "urn:beam:coders:global_window:0.1"
-examples:
-  "": ""
-
----
-
-# All windowed values consist of pane infos that represent NO_FIRING until full support is added
-# in the Python SDK (BEAM-1522).
-coder:
-  urn: "urn:beam:coders:windowed_value:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"},
-               {urn: "urn:beam:coders:global_window:0.1"}]
-examples:
-  "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
-    value: 2,
-    timestamp: 1454293425000,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: ["global"]
-  }
-
----
-
-coder:
-  urn: "urn:beam:coders:windowed_value:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"},
-               {urn: "urn:beam:coders:interval_window:0.1"}]
-examples:
-  "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
-    value: 4,
-    timestamp: -400000,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: [{end: 1454293425000, span: 280000}]
-  }
-
-  "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": {
-    value: 2,
-    timestamp: -100,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
-  }

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
deleted file mode 100644
index df05054..0000000
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""End-to-end test result verifiers
-
-A set of verifiers that are used in end-to-end tests to verify state/output
-of test pipeline job. Customized verifier should extend
-`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
-"""
-
-import logging
-import time
-
-from hamcrest.core.base_matcher import BaseMatcher
-
-from apache_beam.io.filesystems import FileSystems
-from apache_beam.runners.runner import PipelineState
-from apache_beam.tests import test_utils as utils
-from apache_beam.utils import retry
-
-try:
-  from apitools.base.py.exceptions import HttpError
-except ImportError:
-  HttpError = None
-
-MAX_RETRIES = 4
-
-
-class PipelineStateMatcher(BaseMatcher):
-  """Matcher that verify pipeline job terminated in expected state
-
-  Matcher compares the actual pipeline terminate state with expected.
-  By default, `PipelineState.DONE` is used as expected state.
-  """
-
-  def __init__(self, expected_state=PipelineState.DONE):
-    self.expected_state = expected_state
-
-  def _matches(self, pipeline_result):
-    return pipeline_result.state == self.expected_state
-
-  def describe_to(self, description):
-    description \
-      .append_text("Test pipeline expected terminated in state: ") \
-      .append_text(self.expected_state)
-
-  def describe_mismatch(self, pipeline_result, mismatch_description):
-    mismatch_description \
-      .append_text("Test pipeline job terminated in state: ") \
-      .append_text(pipeline_result.state)
-
-
-def retry_on_io_error_and_server_error(exception):
-  """Filter allowing retries on file I/O errors and service error."""
-  return isinstance(exception, IOError) or \
-          (HttpError is not None and isinstance(exception, HttpError))
-
-
-class FileChecksumMatcher(BaseMatcher):
-  """Matcher that verifies file(s) content by comparing file checksum.
-
-  Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
-  is a hash string computed from content of file(s).
-  """
-
-  def __init__(self, file_path, expected_checksum, sleep_secs=None):
-    """Initialize a FileChecksumMatcher object
-
-    Args:
-      file_path : A string that is the full path of output file. This path
-        can contain globs.
-      expected_checksum : A hash string that is computed from expected
-        result.
-      sleep_secs : Number of seconds to wait before verification start.
-        Extra time are given to make sure output files are ready on FS.
-    """
-    if sleep_secs is not None:
-      if isinstance(sleep_secs, int):
-        self.sleep_secs = sleep_secs
-      else:
-        raise ValueError('Sleep seconds, if received, must be int. '
-                         'But received: %r, %s' % (sleep_secs,
-                                                   type(sleep_secs)))
-    else:
-      self.sleep_secs = None
-
-    self.file_path = file_path
-    self.expected_checksum = expected_checksum
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry_on_io_error_and_server_error)
-  def _read_with_retry(self):
-    """Read path with retry if I/O failed"""
-    read_lines = []
-    match_result = FileSystems.match([self.file_path])[0]
-    matched_path = [f.path for f in match_result.metadata_list]
-    if not matched_path:
-      raise IOError('No such file or directory: %s' % self.file_path)
-
-    logging.info('Find %d files in %s: \n%s',
-                 len(matched_path), self.file_path, '\n'.join(matched_path))
-    for path in matched_path:
-      with FileSystems.open(path, 'r') as f:
-        for line in f:
-          read_lines.append(line)
-    return read_lines
-
-  def _matches(self, _):
-    if self.sleep_secs:
-      # Wait to have output file ready on FS
-      logging.info('Wait %d seconds...', self.sleep_secs)
-      time.sleep(self.sleep_secs)
-
-    # Read from given file(s) path
-    read_lines = self._read_with_retry()
-
-    # Compute checksum
-    self.checksum = utils.compute_hash(read_lines)
-    logging.info('Read from given path %s, %d lines, checksum: %s.',
-                 self.file_path, len(read_lines), self.checksum)
-    return self.checksum == self.expected_checksum
-
-  def describe_to(self, description):
-    description \
-      .append_text("Expected checksum is ") \
-      .append_text(self.expected_checksum)
-
-  def describe_mismatch(self, pipeline_result, mismatch_description):
-    mismatch_description \
-      .append_text("Actual checksum is ") \
-      .append_text(self.checksum)

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
deleted file mode 100644
index 909917d..0000000
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ /dev/null
@@ -1,148 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the test pipeline verifiers"""
-
-import logging
-import tempfile
-import unittest
-
-from hamcrest import assert_that as hc_assert_that
-from mock import Mock, patch
-
-from apache_beam.io.localfilesystem import LocalFileSystem
-from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.tests import pipeline_verifiers as verifiers
-from apache_beam.tests.test_utils import patch_retry
-
-try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  # pylint: disable=ungrouped-imports
-  from apitools.base.py.exceptions import HttpError
-  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
-except ImportError:
-  HttpError = None
-  GCSFileSystem = None
-
-
-class PipelineVerifiersTest(unittest.TestCase):
-
-  def setUp(self):
-    self._mock_result = Mock()
-    patch_retry(self, verifiers)
-
-  def test_pipeline_state_matcher_success(self):
-    """Test PipelineStateMatcher successes when using default expected state
-    and job actually finished in DONE
-    """
-    pipeline_result = PipelineResult(PipelineState.DONE)
-    hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
-
-  def test_pipeline_state_matcher_given_state(self):
-    """Test PipelineStateMatcher successes when matches given state"""
-    pipeline_result = PipelineResult(PipelineState.FAILED)
-    hc_assert_that(pipeline_result,
-                   verifiers.PipelineStateMatcher(PipelineState.FAILED))
-
-  def test_pipeline_state_matcher_fails(self):
-    """Test PipelineStateMatcher fails when using default expected state
-    and job actually finished in CANCELLED/DRAINED/FAILED/STOPPED/UNKNOWN
-    """
-    failed_state = [PipelineState.CANCELLED,
-                    PipelineState.DRAINED,
-                    PipelineState.FAILED,
-                    PipelineState.STOPPED,
-                    PipelineState.UNKNOWN]
-
-    for state in failed_state:
-      pipeline_result = PipelineResult(state)
-      with self.assertRaises(AssertionError):
-        hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
-
-  test_cases = [
-      {'content': 'Test FileChecksumMatcher with single file',
-       'num_files': 1,
-       'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'},
-      {'content': 'Test FileChecksumMatcher with multiple files',
-       'num_files': 3,
-       'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'},
-      {'content': '',
-       'num_files': 1,
-       'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'},
-  ]
-
-  def create_temp_file(self, content, directory=None):
-    with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f:
-      f.write(content)
-      return f.name
-
-  def test_file_checksum_matcher_success(self):
-    for case in self.test_cases:
-      temp_dir = tempfile.mkdtemp()
-      for _ in range(case['num_files']):
-        self.create_temp_file(case['content'], temp_dir)
-      matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
-                                              case['expected_checksum'])
-      hc_assert_that(self._mock_result, matcher)
-
-  @patch.object(LocalFileSystem, 'match')
-  def test_file_checksum_matcher_read_failed(self, mock_match):
-    mock_match.side_effect = IOError('No file found.')
-    matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
-    with self.assertRaises(IOError):
-      hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_match.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
-
-  @patch.object(GCSFileSystem, 'match')
-  @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
-  def test_file_checksum_matcher_service_error(self, mock_match):
-    mock_match.side_effect = HttpError(
-        response={'status': '404'}, url='', content='Not Found',
-    )
-    matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
-    with self.assertRaises(HttpError):
-      hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_match.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
-
-  def test_file_checksum_matchcer_invalid_sleep_time(self):
-    with self.assertRaises(ValueError) as cm:
-      verifiers.FileChecksumMatcher('file_path',
-                                    'expected_checksum',
-                                    'invalid_sleep_time')
-    self.assertEqual(cm.exception.message,
-                     'Sleep seconds, if received, must be int. '
-                     'But received: \'invalid_sleep_time\', '
-                     '<type \'str\'>')
-
-  @patch('time.sleep', return_value=None)
-  def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep):
-    temp_dir = tempfile.mkdtemp()
-    case = self.test_cases[0]
-    self.create_temp_file(case['content'], temp_dir)
-    matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
-                                            case['expected_checksum'],
-                                            10)
-    hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mocked_sleep.called)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
deleted file mode 100644
index 666207e..0000000
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Utility methods for testing"""
-
-import hashlib
-import imp
-from mock import Mock, patch
-
-from apache_beam.utils import retry
-
-DEFAULT_HASHING_ALG = 'sha1'
-
-
-def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG):
-  """Compute a hash value from a list of string."""
-  content.sort()
-  m = hashlib.new(hashing_alg)
-  for elem in content:
-    m.update(str(elem))
-  return m.hexdigest()
-
-
-def patch_retry(testcase, module):
-  """A function to patch retry module to use mock clock and logger.
-
-  Clock and logger that defined in retry decorator will be replaced in test
-  in order to skip sleep phase when retry happens.
-
-  Args:
-    testcase: An instance of unittest.TestCase that calls this function to
-      patch retry module.
-    module: The module that uses retry and need to be replaced with mock
-      clock and logger in test.
-  """
-  real_retry_with_exponential_backoff = retry.with_exponential_backoff
-
-  def patched_retry_with_exponential_backoff(num_retries, retry_filter):
-    """A patch for retry decorator to use a mock dummy clock and logger."""
-    return real_retry_with_exponential_backoff(
-        num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
-        clock=Mock())
-
-  patch.object(retry, 'with_exponential_backoff',
-               side_effect=patched_retry_with_exponential_backoff).start()
-
-  # Reload module after patching.
-  imp.reload(module)
-
-  def remove_patches():
-    patch.stopall()
-    # Reload module again after removing patch.
-    imp.reload(module)
-
-  testcase.addCleanup(remove_patches)

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index af76889..1822c19 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -22,7 +22,7 @@ import unittest
 import hamcrest as hc
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.core import CombineGlobally
 from apache_beam.transforms.core import Create

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index 2352acd..9ede4c7 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -22,7 +22,7 @@ from apache_beam.io import source_test_utils
 
 from apache_beam import Create, assert_that, equal_to
 from apache_beam.coders import FastPrimitivesCoder
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class CreateTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 5948460..137992d 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -32,7 +32,7 @@ from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.io.iobase import Read
 from apache_beam.options.pipeline_options import TypeOptions
 import apache_beam.pvalue as pvalue
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import window
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index bf9aeff..0bc9107 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -23,7 +23,7 @@ import unittest
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import window
 from apache_beam.transforms.util import assert_that, equal_to
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 38871fe..2574c4b 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -26,7 +26,7 @@ import yaml
 
 import apache_beam as beam
 from apache_beam.runners import pipeline_context
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.trigger import AccumulationMode

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 9656827..7fdef70 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -20,7 +20,7 @@
 import unittest
 
 from apache_beam import Create
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that, equal_to, is_empty
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 2d2b03d..a7797dd 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -20,7 +20,7 @@
 import unittest
 
 from apache_beam.runners import pipeline_context
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
 from apache_beam.transforms import core

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 3d7fbd9..27e7caa 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -22,7 +22,7 @@ import unittest
 import apache_beam as beam
 
 from apache_beam.io import iobase
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import assert_that, is_empty
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 3812fb1..3494cfe 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -23,10 +23,10 @@ import unittest
 import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam import typehints
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.options.pipeline_options import OptionsContext
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.typehints import WithTypeHints
-from apache_beam.options.pipeline_options import OptionsContext
 
 # These test often construct a pipeline as value | PTransform to test side
 # effects (e.g. errors).

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/utils/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream.py b/sdks/python/apache_beam/utils/test_stream.py
deleted file mode 100644
index 7ae27b7..0000000
--- a/sdks/python/apache_beam/utils/test_stream.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Provides TestStream for verifying streaming runner semantics."""
-
-from abc import ABCMeta
-from abc import abstractmethod
-
-from apache_beam import coders
-from apache_beam import pvalue
-from apache_beam.transforms import PTransform
-from apache_beam.transforms.window import TimestampedValue
-from apache_beam.utils import timestamp
-from apache_beam.utils.windowed_value import WindowedValue
-
-
-class Event(object):
-  """Test stream event to be emitted during execution of a TestStream."""
-
-  __metaclass__ = ABCMeta
-
-  def __cmp__(self, other):
-    if type(self) is not type(other):
-      return cmp(type(self), type(other))
-    return self._typed_cmp(other)
-
-  @abstractmethod
-  def _typed_cmp(self, other):
-    raise NotImplementedError
-
-
-class ElementEvent(Event):
-  """Element-producing test stream event."""
-
-  def __init__(self, timestamped_values):
-    self.timestamped_values = timestamped_values
-
-  def _typed_cmp(self, other):
-    return cmp(self.timestamped_values, other.timestamped_values)
-
-
-class WatermarkEvent(Event):
-  """Watermark-advancing test stream event."""
-
-  def __init__(self, new_watermark):
-    self.new_watermark = timestamp.Timestamp.of(new_watermark)
-
-  def _typed_cmp(self, other):
-    return cmp(self.new_watermark, other.new_watermark)
-
-
-class ProcessingTimeEvent(Event):
-  """Processing time-advancing test stream event."""
-
-  def __init__(self, advance_by):
-    self.advance_by = timestamp.Duration.of(advance_by)
-
-  def _typed_cmp(self, other):
-    return cmp(self.advance_by, other.advance_by)
-
-
-class TestStream(PTransform):
-  """Test stream that generates events on an unbounded PCollection of elements.
-
-  Each event emits elements, advances the watermark or advances the processing
-  time.  After all of the specified elements are emitted, ceases to produce
-  output.
-  """
-
-  def __init__(self, coder=coders.FastPrimitivesCoder):
-    assert coder is not None
-    self.coder = coder
-    self.current_watermark = timestamp.MIN_TIMESTAMP
-    self.events = []
-
-  def expand(self, pbegin):
-    assert isinstance(pbegin, pvalue.PBegin)
-    self.pipeline = pbegin.pipeline
-    return pvalue.PCollection(self.pipeline)
-
-  def _infer_output_coder(self, input_type=None, input_coder=None):
-    return self.coder
-
-  def _add(self, event):
-    if isinstance(event, ElementEvent):
-      for tv in event.timestamped_values:
-        assert tv.timestamp < timestamp.MAX_TIMESTAMP, (
-            'Element timestamp must be before timestamp.MAX_TIMESTAMP.')
-    elif isinstance(event, WatermarkEvent):
-      assert event.new_watermark > self.current_watermark, (
-          'Watermark must strictly-monotonically advance.')
-      self.current_watermark = event.new_watermark
-    elif isinstance(event, ProcessingTimeEvent):
-      assert event.advance_by > 0, (
-          'Must advance processing time by positive amount.')
-    else:
-      raise ValueError('Unknown event: %s' % event)
-    self.events.append(event)
-
-  def add_elements(self, elements):
-    """Add elements to the TestStream.
-
-    Elements added to the TestStream will be produced during pipeline execution.
-    These elements can be TimestampedValue, WindowedValue or raw unwrapped
-    elements that are serializable using the TestStream's specified Coder.  When
-    a TimestampedValue or a WindowedValue element is used, the timestamp of the
-    TimestampedValue or WindowedValue will be the timestamp of the produced
-    element; otherwise, the current watermark timestamp will be used for that
-    element.  The windows of a given WindowedValue are ignored by the
-    TestStream.
-    """
-    timestamped_values = []
-    for element in elements:
-      if isinstance(element, TimestampedValue):
-        timestamped_values.append(element)
-      elif isinstance(element, WindowedValue):
-        # Drop windows for elements in test stream.
-        timestamped_values.append(
-            TimestampedValue(element.value, element.timestamp))
-      else:
-        # Add elements with timestamp equal to current watermark.
-        timestamped_values.append(
-            TimestampedValue(element, self.current_watermark))
-    self._add(ElementEvent(timestamped_values))
-    return self
-
-  def advance_watermark_to(self, new_watermark):
-    """Advance the watermark to a given Unix timestamp.
-
-    The Unix timestamp value used must be later than the previous watermark
-    value and should be given as an int, float or utils.timestamp.Timestamp
-    object.
-    """
-    self._add(WatermarkEvent(new_watermark))
-    return self
-
-  def advance_watermark_to_infinity(self):
-    """Advance the watermark to the end of time."""
-    self.advance_watermark_to(timestamp.MAX_TIMESTAMP)
-    return self
-
-  def advance_processing_time(self, advance_by):
-    """Advance the current processing time by a given duration in seconds.
-
-    The duration must be a positive second duration and should be given as an
-    int, float or utils.timestamp.Duration object.
-    """
-    self._add(ProcessingTimeEvent(advance_by))
-    return self

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/utils/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream_test.py b/sdks/python/apache_beam/utils/test_stream_test.py
deleted file mode 100644
index b5b5c69..0000000
--- a/sdks/python/apache_beam/utils/test_stream_test.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the test_stream module."""
-
-import unittest
-
-from apache_beam.transforms.window import TimestampedValue
-from apache_beam.utils import timestamp
-from apache_beam.utils.test_stream import ElementEvent
-from apache_beam.utils.test_stream import ProcessingTimeEvent
-from apache_beam.utils.test_stream import TestStream
-from apache_beam.utils.test_stream import WatermarkEvent
-from apache_beam.utils.windowed_value import WindowedValue
-
-
-class TestStreamTest(unittest.TestCase):
-
-  def test_basic_test_stream(self):
-    test_stream = (TestStream()
-                   .advance_watermark_to(0)
-                   .add_elements([
-                       'a',
-                       WindowedValue('b', 3, []),
-                       TimestampedValue('c', 6)])
-                   .advance_processing_time(10)
-                   .advance_watermark_to(8)
-                   .add_elements(['d'])
-                   .advance_watermark_to_infinity())
-    self.assertEqual(
-        test_stream.events,
-        [
-            WatermarkEvent(0),
-            ElementEvent([
-                TimestampedValue('a', 0),
-                TimestampedValue('b', 3),
-                TimestampedValue('c', 6),
-            ]),
-            ProcessingTimeEvent(10),
-            WatermarkEvent(8),
-            ElementEvent([
-                TimestampedValue('d', 8),
-            ]),
-            WatermarkEvent(timestamp.MAX_TIMESTAMP),
-        ]
-    )
-
-  def test_test_stream_errors(self):
-    with self.assertRaises(AssertionError, msg=(
-        'Watermark must strictly-monotonically advance.')):
-      _ = (TestStream()
-           .advance_watermark_to(5)
-           .advance_watermark_to(4))
-
-    with self.assertRaises(AssertionError, msg=(
-        'Must advance processing time by positive amount.')):
-      _ = (TestStream()
-           .advance_processing_time(-1))
-
-    with self.assertRaises(AssertionError, msg=(
-        'Element timestamp must be before timestamp.MAX_TIMESTAMP.')):
-      _ = (TestStream()
-           .add_elements([
-               TimestampedValue('a', timestamp.MAX_TIMESTAMP)
-           ]))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 681abbf..9bf3cf4 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -122,7 +122,7 @@ setuptools.setup(
     author_email=PACKAGE_EMAIL,
     packages=setuptools.find_packages(),
     package_data={'apache_beam': [
-        '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', 'tests/data/*']},
+        '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', 'testing/data/*']},
     ext_modules=cythonize([
         'apache_beam/**/*.pyx',
         'apache_beam/coders/coder_impl.py',


[2/3] beam git commit: [BEAM-2236] Cherry pick #3017 - Move test utilities out of python core

Posted by al...@apache.org.
[BEAM-2236] Cherry pick #3017 - Move test utilities out of python core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ecebd22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ecebd22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ecebd22

Branch: refs/heads/release-2.0.0
Commit: 2ecebd22e3071d0ba6d3647273367d16582ca852
Parents: aa8c9d1
Author: Mark Liu <ma...@google.com>
Authored: Tue May 9 16:41:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 9 20:08:55 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/coders/standard_coders_test.py  |   2 +-
 .../examples/complete/autocomplete_test.py      |   2 +-
 .../examples/complete/estimate_pi_test.py       |   2 +-
 .../complete/game/hourly_team_score_test.py     |   2 +-
 .../examples/complete/game/user_score_test.py   |   2 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../complete/top_wikipedia_sessions_test.py     |   2 +-
 .../cookbook/bigquery_side_input_test.py        |   2 +-
 .../cookbook/bigquery_tornadoes_it_test.py      |   4 +-
 .../cookbook/bigquery_tornadoes_test.py         |   2 +-
 .../examples/cookbook/coders_test.py            |   2 +-
 .../examples/cookbook/combiners_test.py         |   2 +-
 .../examples/cookbook/custom_ptransform_test.py |   2 +-
 .../examples/cookbook/filters_test.py           |   2 +-
 .../apache_beam/examples/snippets/snippets.py   |   2 +-
 .../examples/snippets/snippets_test.py          |   2 +-
 .../apache_beam/examples/wordcount_it_test.py   |   6 +-
 sdks/python/apache_beam/io/avroio_test.py       |   2 +-
 .../python/apache_beam/io/concat_source_test.py |   2 +-
 .../apache_beam/io/filebasedsource_test.py      |   6 +-
 sdks/python/apache_beam/io/fileio_test.py       |   2 +-
 .../io/gcp/datastore/v1/helper_test.py          |   2 +-
 .../io/gcp/tests/bigquery_matcher.py            |   2 +-
 .../io/gcp/tests/bigquery_matcher_test.py       |   2 +-
 sdks/python/apache_beam/io/sources_test.py      |   2 +-
 sdks/python/apache_beam/io/textio_test.py       |   2 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |   2 +-
 sdks/python/apache_beam/pipeline_test.py        |   2 +-
 sdks/python/apache_beam/pvalue_test.py          |   2 +-
 .../runners/dataflow/dataflow_runner_test.py    |   4 +-
 sdks/python/apache_beam/test_pipeline.py        | 163 ---------------
 sdks/python/apache_beam/test_pipeline_test.py   | 112 -----------
 sdks/python/apache_beam/testing/__init__.py     |  16 ++
 .../apache_beam/testing/data/privatekey.p12     | Bin 0 -> 2452 bytes
 .../testing/data/standard_coders.yaml           | 196 +++++++++++++++++++
 .../apache_beam/testing/pipeline_verifiers.py   | 146 ++++++++++++++
 .../testing/pipeline_verifiers_test.py          | 148 ++++++++++++++
 .../python/apache_beam/testing/test_pipeline.py | 163 +++++++++++++++
 .../apache_beam/testing/test_pipeline_test.py   | 112 +++++++++++
 sdks/python/apache_beam/testing/test_stream.py  | 163 +++++++++++++++
 .../apache_beam/testing/test_stream_test.py     |  83 ++++++++
 sdks/python/apache_beam/testing/test_utils.py   |  69 +++++++
 sdks/python/apache_beam/tests/__init__.py       |  16 --
 .../apache_beam/tests/data/privatekey.p12       | Bin 2452 -> 0 bytes
 .../apache_beam/tests/data/standard_coders.yaml | 196 -------------------
 .../apache_beam/tests/pipeline_verifiers.py     | 146 --------------
 .../tests/pipeline_verifiers_test.py            | 148 --------------
 sdks/python/apache_beam/tests/test_utils.py     |  69 -------
 .../apache_beam/transforms/combiners_test.py    |   2 +-
 .../apache_beam/transforms/create_test.py       |   2 +-
 .../apache_beam/transforms/ptransform_test.py   |   2 +-
 .../apache_beam/transforms/sideinputs_test.py   |   2 +-
 .../apache_beam/transforms/trigger_test.py      |   2 +-
 sdks/python/apache_beam/transforms/util_test.py |   2 +-
 .../apache_beam/transforms/window_test.py       |   2 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |   4 +-
 sdks/python/apache_beam/utils/test_stream.py    | 163 ---------------
 .../apache_beam/utils/test_stream_test.py       |  83 --------
 sdks/python/setup.py                            |   2 +-
 60 files changed, 1143 insertions(+), 1143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 4a48ed9..885e88f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -34,7 +34,7 @@ from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms import window
 
 STANDARD_CODERS_YAML = os.path.join(
-    os.path.dirname(__file__), '..', 'tests', 'data', 'standard_coders.yaml')
+    os.path.dirname(__file__), '..', 'testing', 'data', 'standard_coders.yaml')
 
 
 def _load_test_cases(test_yaml):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index d59d0f5..438633a 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,7 +21,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index dc5b901..12d8379 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -21,7 +21,7 @@ import logging
 import unittest
 
 from apache_beam.examples.complete import estimate_pi
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import BeamAssertException
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
index 0eaa8c6..bd0abca 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
@@ -21,8 +21,8 @@ import logging
 import unittest
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
 from apache_beam.examples.complete.game import hourly_team_score
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class HourlyTeamScoreTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/game/user_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
index 750729d..2db53bd 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
@@ -21,8 +21,8 @@ import logging
 import unittest
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
 from apache_beam.examples.complete.game import user_score
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class UserScoreTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 05e53a4..0e30254 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -25,7 +25,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.complete import tfidf
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 EXPECTED_RESULTS = set([

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 9b9d9b1..4850c04 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -23,7 +23,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.complete import top_wikipedia_sessions
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class ComputeTopSessionsTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 5869976..1ca25c9 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -22,7 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_side_input
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class BigQuerySideInputTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index 3e302d1..5d2ee7c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -26,8 +26,8 @@ from nose.plugins.attrib import attr
 
 from apache_beam.examples.cookbook import bigquery_tornadoes
 from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class BigqueryTornadoesIT(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 0c66d7e..ca7ca9e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -22,7 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_tornadoes
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class BigQueryTornadoesTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 4a92abb..35cf252 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -22,7 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import coders
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index a8ed555..45c779f 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -27,7 +27,7 @@ import logging
 import unittest
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class CombinersTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index cd1c04a..2d35d8d 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -22,7 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import custom_ptransform
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 28bb1e1..44a352f 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -22,7 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import filters
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class FiltersTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 26af71d..1bdb9a3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -31,8 +31,8 @@ string. The tags can contain only letters, digits and _.
 """
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
 from apache_beam.metrics import Metrics
+from apache_beam.testing.test_pipeline import TestPipeline
 
 # Quiet some pylint warnings that happen because of the somewhat special
 # format for the code snippets.

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index da0a962..85d8bde 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -36,7 +36,7 @@ from apache_beam.examples.snippets import snippets
 from apache_beam.utils.windowed_value import WindowedValue
 
 # pylint: disable=expression-not-assigned
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 54e54e8..4bee127 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -25,9 +25,9 @@ from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 
 from apache_beam.examples import wordcount
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
-from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
+from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class WordCountIT(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 5f2db62..4a21839 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -26,7 +26,7 @@ from apache_beam.io import iobase
 from apache_beam.io import avroio
 from apache_beam.io import filebasedsource
 from apache_beam.io import source_test_utils
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 807c3fd..a02f9ad 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -26,7 +26,7 @@ from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.io import source_test_utils
 from apache_beam.io.concat_source import ConcatSource
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 4ff23fc..e17a004 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -38,13 +38,13 @@ from apache_beam.io.concat_source import ConcatSource
 from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
 
 from apache_beam.io.filebasedsource import FileBasedSource
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import RuntimeValueProvider
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.options.value_provider import StaticValueProvider
-from apache_beam.options.value_provider import RuntimeValueProvider
 
 
 class LineSource(FileBasedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index e0e9774..4c25505 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -31,7 +31,7 @@ import mock
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 5d4bb6f..a804c09 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -23,7 +23,7 @@ from mock import MagicMock
 
 from apache_beam.io.gcp.datastore.v1 import fake_datastore
 from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.tests.test_utils import patch_retry
+from apache_beam.testing.test_utils import patch_retry
 
 
 # Protect against environments where apitools library is not available.

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 66d99b3..f42b70f 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -21,7 +21,7 @@ import logging
 
 from hamcrest.core.base_matcher import BaseMatcher
 
-from apache_beam.tests.test_utils import compute_hash
+from apache_beam.testing.test_utils import compute_hash
 from apache_beam.utils import retry
 
 # Protect against environments where bigquery library is not available.

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index d8aa148..f12293e 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -24,7 +24,7 @@ from hamcrest import assert_that as hc_assert_that
 from mock import Mock, patch
 
 from apache_beam.io.gcp.tests import bigquery_matcher as bq_verifier
-from apache_beam.tests.test_utils import patch_retry
+from apache_beam.testing.test_utils import patch_retry
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 3f92756..c0b8ad6 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -27,7 +27,7 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 90dc665..d00afef 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -43,7 +43,7 @@ from apache_beam.io.filebasedsource_test import write_data
 from apache_beam.io.filebasedsource_test import write_pattern
 from apache_beam.io.filesystem import CompressionTypes
 
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index 29a9fb8..b7e370d 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -35,7 +35,7 @@ from apache_beam.io.tfrecordio import _TFRecordSource
 from apache_beam.io.tfrecordio import _TFRecordUtil
 from apache_beam.io.tfrecordio import ReadFromTFRecord
 from apache_beam.io.tfrecordio import WriteToTFRecord
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 import crcmod
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index ebcc43b..c6b1e48 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -32,7 +32,7 @@ from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.pvalue import AsSingleton
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index 529ddf7..4acbc52 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -20,7 +20,7 @@
 import unittest
 
 from apache_beam.pvalue import PValue
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class PValueTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index a9f61a7..b61a683 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -26,6 +26,7 @@ import mock
 import apache_beam as beam
 import apache_beam.transforms as ptransform
 
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import Pipeline, AppliedPTransform
 from apache_beam.pvalue import PCollection
 from apache_beam.runners import create_runner
@@ -34,10 +35,9 @@ from apache_beam.runners import TestDataflowRunner
 from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
 from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.typehints import typehints
-from apache_beam.options.pipeline_options import PipelineOptions
 
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
deleted file mode 100644
index 20f4839..0000000
--- a/sdks/python/apache_beam/test_pipeline.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Test Pipeline, a wrapper of Pipeline for test purpose"""
-
-import argparse
-import shlex
-
-from apache_beam.internal import pickler
-from apache_beam.pipeline import Pipeline
-from apache_beam.runners.runner import PipelineState
-from apache_beam.options.pipeline_options import PipelineOptions
-from nose.plugins.skip import SkipTest
-
-
-class TestPipeline(Pipeline):
-  """TestPipeline class is used inside of Beam tests that can be configured to
-  run against pipeline runner.
-
-  It has a functionality to parse arguments from command line and build pipeline
-  options for tests who runs against a pipeline runner and utilizes resources
-  of the pipeline runner. Those test functions are recommended to be tagged by
-  @attr("ValidatesRunner") annotation.
-
-  In order to configure the test with customized pipeline options from command
-  line, system argument 'test-pipeline-options' can be used to obtains a list
-  of pipeline options. If no options specified, default value will be used.
-
-  For example, use following command line to execute all ValidatesRunner tests::
-
-    python setup.py nosetests -a ValidatesRunner \
-        --test-pipeline-options="--runner=DirectRunner \
-                                 --job_name=myJobName \
-                                 --num_workers=1"
-
-  For example, use assert_that for test validation::
-
-    pipeline = TestPipeline()
-    pcoll = ...
-    assert_that(pcoll, equal_to(...))
-    pipeline.run()
-  """
-
-  def __init__(self,
-               runner=None,
-               options=None,
-               argv=None,
-               is_integration_test=False,
-               blocking=True):
-    """Initialize a pipeline object for test.
-
-    Args:
-      runner: An object of type 'PipelineRunner' that will be used to execute
-        the pipeline. For registered runners, the runner name can be specified,
-        otherwise a runner object must be supplied.
-      options: A configured 'PipelineOptions' object containing arguments
-        that should be used for running the pipeline job.
-      argv: A list of arguments (such as sys.argv) to be used for building a
-        'PipelineOptions' object. This will only be used if argument 'options'
-        is None.
-      is_integration_test: True if the test is an integration test, False
-        otherwise.
-      blocking: Run method will wait until pipeline execution is completed.
-
-    Raises:
-      ValueError: if either the runner or options argument is not of the
-      expected type.
-    """
-    self.is_integration_test = is_integration_test
-    self.options_list = self._parse_test_option_args(argv)
-    self.blocking = blocking
-    if options is None:
-      options = PipelineOptions(self.options_list)
-    super(TestPipeline, self).__init__(runner, options)
-
-  def run(self):
-    result = super(TestPipeline, self).run()
-    if self.blocking:
-      state = result.wait_until_finish()
-      assert state == PipelineState.DONE, "Pipeline execution failed."
-
-    return result
-
-  def _parse_test_option_args(self, argv):
-    """Parse value of command line argument: --test-pipeline-options to get
-    pipeline options.
-
-    Args:
-      argv: An iterable of command line arguments to be used. If not specified
-        then sys.argv will be used as input for parsing arguments.
-
-    Returns:
-      An argument list of options that can be parsed by argparser or directly
-      build a pipeline option.
-    """
-    parser = argparse.ArgumentParser()
-    parser.add_argument('--test-pipeline-options',
-                        type=str,
-                        action='store',
-                        help='only run tests providing service options')
-    known, unused_argv = parser.parse_known_args(argv)
-
-    if self.is_integration_test and not known.test_pipeline_options:
-      # Skip integration test when argument '--test-pipeline-options' is not
-      # specified since nose calls integration tests when runs unit test by
-      # 'setup.py test'.
-      raise SkipTest('IT is skipped because --test-pipeline-options '
-                     'is not specified')
-
-    return shlex.split(known.test_pipeline_options) \
-      if known.test_pipeline_options else []
-
-  def get_full_options_as_args(self, **extra_opts):
-    """Get full pipeline options as an argument list.
-
-    Append extra pipeline options to existing option list if provided.
-    Test verifier (if contains in extra options) should be pickled before
-    appending, and will be unpickled later in the TestRunner.
-    """
-    options = list(self.options_list)
-    for k, v in extra_opts.items():
-      if not v:
-        continue
-      elif isinstance(v, bool) and v:
-        options.append('--%s' % k)
-      elif 'matcher' in k:
-        options.append('--%s=%s' % (k, pickler.dumps(v)))
-      else:
-        options.append('--%s=%s' % (k, v))
-    return options
-
-  def get_option(self, opt_name):
-    """Get a pipeline option value by name
-
-    Args:
-      opt_name: The name of the pipeline option.
-
-    Returns:
-      None if option is not found in existing option list which is generated
-      by parsing value of argument `test-pipeline-options`.
-    """
-    parser = argparse.ArgumentParser()
-    opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name
-    # Option name should start with '--' when it's used for parsing.
-    parser.add_argument('--' + opt_name,
-                        type=str,
-                        action='store')
-    known, _ = parser.parse_known_args(self.options_list)
-    return getattr(known, opt_name) if hasattr(known, opt_name) else None

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
deleted file mode 100644
index 325cab7..0000000
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ /dev/null
@@ -1,112 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit test for the TestPipeline class"""
-
-import logging
-import unittest
-
-from hamcrest.core.base_matcher import BaseMatcher
-from hamcrest.core.assert_that import assert_that as hc_assert_that
-
-from apache_beam.internal import pickler
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.options.pipeline_options import PipelineOptions
-
-
-# A simple matcher that is ued for testing extra options appending.
-class SimpleMatcher(BaseMatcher):
-  def _matches(self, item):
-    return True
-
-
-class TestPipelineTest(unittest.TestCase):
-
-  TEST_CASE = {'options':
-                   ['--test-pipeline-options', '--job=mockJob --male --age=1'],
-               'expected_list': ['--job=mockJob', '--male', '--age=1'],
-               'expected_dict': {'job': 'mockJob',
-                                 'male': True,
-                                 'age': 1}}
-
-  # Used for testing pipeline option creation.
-  class TestParsingOptions(PipelineOptions):
-
-    @classmethod
-    def _add_argparse_args(cls, parser):
-      parser.add_argument('--job', action='store', help='mock job')
-      parser.add_argument('--male', action='store_true', help='mock gender')
-      parser.add_argument('--age', action='store', type=int, help='mock age')
-
-  def test_option_args_parsing(self):
-    test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
-    self.assertListEqual(
-        sorted(test_pipeline.get_full_options_as_args()),
-        sorted(self.TEST_CASE['expected_list']))
-
-  def test_empty_option_args_parsing(self):
-    test_pipeline = TestPipeline()
-    self.assertListEqual([],
-                         test_pipeline.get_full_options_as_args())
-
-  def test_create_test_pipeline_options(self):
-    test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
-    test_options = PipelineOptions(test_pipeline.get_full_options_as_args())
-    self.assertDictContainsSubset(self.TEST_CASE['expected_dict'],
-                                  test_options.get_all_options())
-
-  EXTRA_OPT_CASES = [
-      {'options': {'name': 'Mark'},
-       'expected': ['--name=Mark']},
-      {'options': {'student': True},
-       'expected': ['--student']},
-      {'options': {'student': False},
-       'expected': []},
-      {'options': {'name': 'Mark', 'student': True},
-       'expected': ['--name=Mark', '--student']}
-  ]
-
-  def test_append_extra_options(self):
-    test_pipeline = TestPipeline()
-    for case in self.EXTRA_OPT_CASES:
-      opt_list = test_pipeline.get_full_options_as_args(**case['options'])
-      self.assertListEqual(sorted(opt_list), sorted(case['expected']))
-
-  def test_append_verifier_in_extra_opt(self):
-    extra_opt = {'matcher': SimpleMatcher()}
-    opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
-    _, value = opt_list[0].split('=', 1)
-    matcher = pickler.loads(value)
-    self.assertTrue(isinstance(matcher, BaseMatcher))
-    hc_assert_that(None, matcher)
-
-  def test_get_option(self):
-    name, value = ('job', 'mockJob')
-    test_pipeline = TestPipeline()
-    test_pipeline.options_list = ['--%s=%s' % (name, value)]
-    self.assertEqual(test_pipeline.get_option(name), value)
-
-  def test_skip_IT(self):
-    test_pipeline = TestPipeline(is_integration_test=True)
-    test_pipeline.run()
-    # Note that this will never be reached since it should be skipped above.
-    self.fail()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/__init__.py b/sdks/python/apache_beam/testing/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/testing/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/data/privatekey.p12
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/data/privatekey.p12 b/sdks/python/apache_beam/testing/data/privatekey.p12
new file mode 100644
index 0000000..c369ecb
Binary files /dev/null and b/sdks/python/apache_beam/testing/data/privatekey.p12 differ

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/data/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/data/standard_coders.yaml b/sdks/python/apache_beam/testing/data/standard_coders.yaml
new file mode 100644
index 0000000..790cacb
--- /dev/null
+++ b/sdks/python/apache_beam/testing/data/standard_coders.yaml
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is broken into multiple sections delimited by ---. Each section specifies a set of
+# reference encodings for a single standardized coder used in a specific context.
+#
+# Each section contains up to 3 properties:
+#
+#   coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
+#   nested: a boolean meaning whether the coder was used in the nested context. Missing means to
+#           test both contexts, a shorthand for when the coder is invariant across context.
+#   examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context.
+#             The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is
+#             one of a few standard JSON types such as numbers, strings, dicts that map naturally
+#             to the type encoded by the coder.
+#
+# These choices were made to strike a balance between portability, ease of use, and simple
+# legibility of this file itself.
+#
+# It is expected that future work will move the `coder` field into a format that it would be
+# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
+#
+# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated.
+
+
+
+coder:
+  urn: "urn:beam:coders:bytes:0.1"
+nested: false
+examples:
+  "abc": abc
+  "ab\0c": "ab\0c"
+
+---
+
+coder:
+  urn: "urn:beam:coders:bytes:0.1"
+nested: true
+examples:
+  "\u0003abc": abc
+  "\u0004ab\0c": "ab\0c"
+  "\u00c8\u0001       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|":
+              "       10|       20|       30|       40|       50|       60|       70|       80|       90|      100|      110|      120|      130|      140|      150|      160|      170|      180|      190|      200|"
+
+---
+
+coder:
+  urn: "urn:beam:coders:varint:0.1"
+examples:
+  "\0": 0
+  "\u0001": 1
+  "\u000A": 10
+  "\u00c8\u0001": 200
+  "\u00e8\u0007": 1000
+  "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
+
+---
+
+coder:
+  urn: "urn:beam:coders:kv:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"},
+               {urn: "urn:beam:coders:varint:0.1"}]
+examples:
+  "\u0003abc\0": {key: abc, value: 0}
+  "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
+
+---
+
+coder:
+  urn: "urn:beam:coders:kv:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"},
+               {urn: "urn:beam:coders:bytes:0.1"}]
+nested: false
+examples:
+  "\u0003abcdef": {key: abc, value: def}
+  "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+  urn: "urn:beam:coders:kv:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"},
+               {urn: "urn:beam:coders:bytes:0.1"}]
+nested: true
+examples:
+  "\u0003abc\u0003def": {key: abc, value: def}
+  "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+  urn: "urn:beam:coders:interval_window:0.1"
+examples:
+  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
+  "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
+  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365}
+  "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"}]
+examples:
+  "\0\0\0\u0001\0": [0]
+  "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
+  "\0\0\0\0": []
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"}]
+examples:
+  "\0\0\0\u0001\u0003abc": ["abc"]
+  "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
+  "\0\0\0\0": []
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"}]
+  # This is for iterables of unknown length, where the encoding is not
+  # deterministic.
+  non_deterministic: True
+examples:
+  "\u00ff\u00ff\u00ff\u00ff\u0000": []
+  "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
+  "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\0\0\0\u0001": [""]
+
+---
+
+coder:
+  urn: "urn:beam:coders:global_window:0.1"
+examples:
+  "": ""
+
+---
+
+# All windowed values consist of pane infos that represent NO_FIRING until full support is added
+# in the Python SDK (BEAM-1522).
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
+    value: 2,
+    timestamp: 1454293425000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: ["global"]
+  }
+
+---
+
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:interval_window:0.1"}]
+examples:
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
+    value: 4,
+    timestamp: -400000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 280000}]
+  }
+
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": {
+    value: 2,
+    timestamp: -100,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
+  }

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py
new file mode 100644
index 0000000..5a6082a
--- /dev/null
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test result verifiers
+
+A set of verifiers that are used in end-to-end tests to verify state/output
+of test pipeline job. Customized verifier should extend
+`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
+"""
+
+import logging
+import time
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils as utils
+from apache_beam.utils import retry
+
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+
+MAX_RETRIES = 4
+
+
+class PipelineStateMatcher(BaseMatcher):
+  """Matcher that verify pipeline job terminated in expected state
+
+  Matcher compares the actual pipeline terminate state with expected.
+  By default, `PipelineState.DONE` is used as expected state.
+  """
+
+  def __init__(self, expected_state=PipelineState.DONE):
+    self.expected_state = expected_state
+
+  def _matches(self, pipeline_result):
+    return pipeline_result.state == self.expected_state
+
+  def describe_to(self, description):
+    description \
+      .append_text("Test pipeline expected terminated in state: ") \
+      .append_text(self.expected_state)
+
+  def describe_mismatch(self, pipeline_result, mismatch_description):
+    mismatch_description \
+      .append_text("Test pipeline job terminated in state: ") \
+      .append_text(pipeline_result.state)
+
+
+def retry_on_io_error_and_server_error(exception):
+  """Filter allowing retries on file I/O errors and service error."""
+  return isinstance(exception, IOError) or \
+          (HttpError is not None and isinstance(exception, HttpError))
+
+
+class FileChecksumMatcher(BaseMatcher):
+  """Matcher that verifies file(s) content by comparing file checksum.
+
+  Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
+  is a hash string computed from content of file(s).
+  """
+
+  def __init__(self, file_path, expected_checksum, sleep_secs=None):
+    """Initialize a FileChecksumMatcher object
+
+    Args:
+      file_path : A string that is the full path of output file. This path
+        can contain globs.
+      expected_checksum : A hash string that is computed from expected
+        result.
+      sleep_secs : Number of seconds to wait before verification start.
+        Extra time are given to make sure output files are ready on FS.
+    """
+    if sleep_secs is not None:
+      if isinstance(sleep_secs, int):
+        self.sleep_secs = sleep_secs
+      else:
+        raise ValueError('Sleep seconds, if received, must be int. '
+                         'But received: %r, %s' % (sleep_secs,
+                                                   type(sleep_secs)))
+    else:
+      self.sleep_secs = None
+
+    self.file_path = file_path
+    self.expected_checksum = expected_checksum
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry_on_io_error_and_server_error)
+  def _read_with_retry(self):
+    """Read path with retry if I/O failed"""
+    read_lines = []
+    match_result = FileSystems.match([self.file_path])[0]
+    matched_path = [f.path for f in match_result.metadata_list]
+    if not matched_path:
+      raise IOError('No such file or directory: %s' % self.file_path)
+
+    logging.info('Find %d files in %s: \n%s',
+                 len(matched_path), self.file_path, '\n'.join(matched_path))
+    for path in matched_path:
+      with FileSystems.open(path, 'r') as f:
+        for line in f:
+          read_lines.append(line)
+    return read_lines
+
+  def _matches(self, _):
+    if self.sleep_secs:
+      # Wait to have output file ready on FS
+      logging.info('Wait %d seconds...', self.sleep_secs)
+      time.sleep(self.sleep_secs)
+
+    # Read from given file(s) path
+    read_lines = self._read_with_retry()
+
+    # Compute checksum
+    self.checksum = utils.compute_hash(read_lines)
+    logging.info('Read from given path %s, %d lines, checksum: %s.',
+                 self.file_path, len(read_lines), self.checksum)
+    return self.checksum == self.expected_checksum
+
+  def describe_to(self, description):
+    description \
+      .append_text("Expected checksum is ") \
+      .append_text(self.expected_checksum)
+
+  def describe_mismatch(self, pipeline_result, mismatch_description):
+    mismatch_description \
+      .append_text("Actual checksum is ") \
+      .append_text(self.checksum)

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers_test.py b/sdks/python/apache_beam/testing/pipeline_verifiers_test.py
new file mode 100644
index 0000000..15e0a04
--- /dev/null
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers_test.py
@@ -0,0 +1,148 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test pipeline verifiers"""
+
+import logging
+import tempfile
+import unittest
+
+from hamcrest import assert_that as hc_assert_that
+from mock import Mock, patch
+
+from apache_beam.io.localfilesystem import LocalFileSystem
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_utils import patch_retry
+from apache_beam.testing import pipeline_verifiers as verifiers
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from apitools.base.py.exceptions import HttpError
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  HttpError = None
+  GCSFileSystem = None
+
+
+class PipelineVerifiersTest(unittest.TestCase):
+
+  def setUp(self):
+    self._mock_result = Mock()
+    patch_retry(self, verifiers)
+
+  def test_pipeline_state_matcher_success(self):
+    """Test PipelineStateMatcher successes when using default expected state
+    and job actually finished in DONE
+    """
+    pipeline_result = PipelineResult(PipelineState.DONE)
+    hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
+
+  def test_pipeline_state_matcher_given_state(self):
+    """Test PipelineStateMatcher successes when matches given state"""
+    pipeline_result = PipelineResult(PipelineState.FAILED)
+    hc_assert_that(pipeline_result,
+                   verifiers.PipelineStateMatcher(PipelineState.FAILED))
+
+  def test_pipeline_state_matcher_fails(self):
+    """Test PipelineStateMatcher fails when using default expected state
+    and job actually finished in CANCELLED/DRAINED/FAILED/STOPPED/UNKNOWN
+    """
+    failed_state = [PipelineState.CANCELLED,
+                    PipelineState.DRAINED,
+                    PipelineState.FAILED,
+                    PipelineState.STOPPED,
+                    PipelineState.UNKNOWN]
+
+    for state in failed_state:
+      pipeline_result = PipelineResult(state)
+      with self.assertRaises(AssertionError):
+        hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
+
+  test_cases = [
+      {'content': 'Test FileChecksumMatcher with single file',
+       'num_files': 1,
+       'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'},
+      {'content': 'Test FileChecksumMatcher with multiple files',
+       'num_files': 3,
+       'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'},
+      {'content': '',
+       'num_files': 1,
+       'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'},
+  ]
+
+  def create_temp_file(self, content, directory=None):
+    with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f:
+      f.write(content)
+      return f.name
+
+  def test_file_checksum_matcher_success(self):
+    for case in self.test_cases:
+      temp_dir = tempfile.mkdtemp()
+      for _ in range(case['num_files']):
+        self.create_temp_file(case['content'], temp_dir)
+      matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+                                              case['expected_checksum'])
+      hc_assert_that(self._mock_result, matcher)
+
+  @patch.object(LocalFileSystem, 'match')
+  def test_file_checksum_matcher_read_failed(self, mock_match):
+    mock_match.side_effect = IOError('No file found.')
+    matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
+    with self.assertRaises(IOError):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mock_match.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
+
+  @patch.object(GCSFileSystem, 'match')
+  @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
+  def test_file_checksum_matcher_service_error(self, mock_match):
+    mock_match.side_effect = HttpError(
+        response={'status': '404'}, url='', content='Not Found',
+    )
+    matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
+    with self.assertRaises(HttpError):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mock_match.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
+
+  def test_file_checksum_matchcer_invalid_sleep_time(self):
+    with self.assertRaises(ValueError) as cm:
+      verifiers.FileChecksumMatcher('file_path',
+                                    'expected_checksum',
+                                    'invalid_sleep_time')
+    self.assertEqual(cm.exception.message,
+                     'Sleep seconds, if received, must be int. '
+                     'But received: \'invalid_sleep_time\', '
+                     '<type \'str\'>')
+
+  @patch('time.sleep', return_value=None)
+  def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep):
+    temp_dir = tempfile.mkdtemp()
+    case = self.test_cases[0]
+    self.create_temp_file(case['content'], temp_dir)
+    matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+                                            case['expected_checksum'],
+                                            10)
+    hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mocked_sleep.called)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
new file mode 100644
index 0000000..20f4839
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test Pipeline, a wrapper of Pipeline for test purpose"""
+
+import argparse
+import shlex
+
+from apache_beam.internal import pickler
+from apache_beam.pipeline import Pipeline
+from apache_beam.runners.runner import PipelineState
+from apache_beam.options.pipeline_options import PipelineOptions
+from nose.plugins.skip import SkipTest
+
+
+class TestPipeline(Pipeline):
+  """TestPipeline class is used inside of Beam tests that can be configured to
+  run against pipeline runner.
+
+  It has a functionality to parse arguments from command line and build pipeline
+  options for tests who runs against a pipeline runner and utilizes resources
+  of the pipeline runner. Those test functions are recommended to be tagged by
+  @attr("ValidatesRunner") annotation.
+
+  In order to configure the test with customized pipeline options from command
+  line, system argument 'test-pipeline-options' can be used to obtains a list
+  of pipeline options. If no options specified, default value will be used.
+
+  For example, use following command line to execute all ValidatesRunner tests::
+
+    python setup.py nosetests -a ValidatesRunner \
+        --test-pipeline-options="--runner=DirectRunner \
+                                 --job_name=myJobName \
+                                 --num_workers=1"
+
+  For example, use assert_that for test validation::
+
+    pipeline = TestPipeline()
+    pcoll = ...
+    assert_that(pcoll, equal_to(...))
+    pipeline.run()
+  """
+
+  def __init__(self,
+               runner=None,
+               options=None,
+               argv=None,
+               is_integration_test=False,
+               blocking=True):
+    """Initialize a pipeline object for test.
+
+    Args:
+      runner: An object of type 'PipelineRunner' that will be used to execute
+        the pipeline. For registered runners, the runner name can be specified,
+        otherwise a runner object must be supplied.
+      options: A configured 'PipelineOptions' object containing arguments
+        that should be used for running the pipeline job.
+      argv: A list of arguments (such as sys.argv) to be used for building a
+        'PipelineOptions' object. This will only be used if argument 'options'
+        is None.
+      is_integration_test: True if the test is an integration test, False
+        otherwise.
+      blocking: Run method will wait until pipeline execution is completed.
+
+    Raises:
+      ValueError: if either the runner or options argument is not of the
+      expected type.
+    """
+    self.is_integration_test = is_integration_test
+    self.options_list = self._parse_test_option_args(argv)
+    self.blocking = blocking
+    if options is None:
+      options = PipelineOptions(self.options_list)
+    super(TestPipeline, self).__init__(runner, options)
+
+  def run(self):
+    result = super(TestPipeline, self).run()
+    if self.blocking:
+      state = result.wait_until_finish()
+      assert state == PipelineState.DONE, "Pipeline execution failed."
+
+    return result
+
+  def _parse_test_option_args(self, argv):
+    """Parse value of command line argument: --test-pipeline-options to get
+    pipeline options.
+
+    Args:
+      argv: An iterable of command line arguments to be used. If not specified
+        then sys.argv will be used as input for parsing arguments.
+
+    Returns:
+      An argument list of options that can be parsed by argparser or directly
+      build a pipeline option.
+    """
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--test-pipeline-options',
+                        type=str,
+                        action='store',
+                        help='only run tests providing service options')
+    known, unused_argv = parser.parse_known_args(argv)
+
+    if self.is_integration_test and not known.test_pipeline_options:
+      # Skip integration test when argument '--test-pipeline-options' is not
+      # specified since nose calls integration tests when runs unit test by
+      # 'setup.py test'.
+      raise SkipTest('IT is skipped because --test-pipeline-options '
+                     'is not specified')
+
+    return shlex.split(known.test_pipeline_options) \
+      if known.test_pipeline_options else []
+
+  def get_full_options_as_args(self, **extra_opts):
+    """Get full pipeline options as an argument list.
+
+    Append extra pipeline options to existing option list if provided.
+    Test verifier (if contains in extra options) should be pickled before
+    appending, and will be unpickled later in the TestRunner.
+    """
+    options = list(self.options_list)
+    for k, v in extra_opts.items():
+      if not v:
+        continue
+      elif isinstance(v, bool) and v:
+        options.append('--%s' % k)
+      elif 'matcher' in k:
+        options.append('--%s=%s' % (k, pickler.dumps(v)))
+      else:
+        options.append('--%s=%s' % (k, v))
+    return options
+
+  def get_option(self, opt_name):
+    """Get a pipeline option value by name
+
+    Args:
+      opt_name: The name of the pipeline option.
+
+    Returns:
+      None if option is not found in existing option list which is generated
+      by parsing value of argument `test-pipeline-options`.
+    """
+    parser = argparse.ArgumentParser()
+    opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name
+    # Option name should start with '--' when it's used for parsing.
+    parser.add_argument('--' + opt_name,
+                        type=str,
+                        action='store')
+    known, _ = parser.parse_known_args(self.options_list)
+    return getattr(known, opt_name) if hasattr(known, opt_name) else None

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline_test.py b/sdks/python/apache_beam/testing/test_pipeline_test.py
new file mode 100644
index 0000000..747d64c7
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_pipeline_test.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit test for the TestPipeline class"""
+
+import logging
+import unittest
+
+from hamcrest.core.base_matcher import BaseMatcher
+from hamcrest.core.assert_that import assert_that as hc_assert_that
+
+from apache_beam.internal import pickler
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+# A simple matcher that is ued for testing extra options appending.
+class SimpleMatcher(BaseMatcher):
+  def _matches(self, item):
+    return True
+
+
+class TestPipelineTest(unittest.TestCase):
+
+  TEST_CASE = {'options':
+                   ['--test-pipeline-options', '--job=mockJob --male --age=1'],
+               'expected_list': ['--job=mockJob', '--male', '--age=1'],
+               'expected_dict': {'job': 'mockJob',
+                                 'male': True,
+                                 'age': 1}}
+
+  # Used for testing pipeline option creation.
+  class TestParsingOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--job', action='store', help='mock job')
+      parser.add_argument('--male', action='store_true', help='mock gender')
+      parser.add_argument('--age', action='store', type=int, help='mock age')
+
+  def test_option_args_parsing(self):
+    test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
+    self.assertListEqual(
+        sorted(test_pipeline.get_full_options_as_args()),
+        sorted(self.TEST_CASE['expected_list']))
+
+  def test_empty_option_args_parsing(self):
+    test_pipeline = TestPipeline()
+    self.assertListEqual([],
+                         test_pipeline.get_full_options_as_args())
+
+  def test_create_test_pipeline_options(self):
+    test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
+    test_options = PipelineOptions(test_pipeline.get_full_options_as_args())
+    self.assertDictContainsSubset(self.TEST_CASE['expected_dict'],
+                                  test_options.get_all_options())
+
+  EXTRA_OPT_CASES = [
+      {'options': {'name': 'Mark'},
+       'expected': ['--name=Mark']},
+      {'options': {'student': True},
+       'expected': ['--student']},
+      {'options': {'student': False},
+       'expected': []},
+      {'options': {'name': 'Mark', 'student': True},
+       'expected': ['--name=Mark', '--student']}
+  ]
+
+  def test_append_extra_options(self):
+    test_pipeline = TestPipeline()
+    for case in self.EXTRA_OPT_CASES:
+      opt_list = test_pipeline.get_full_options_as_args(**case['options'])
+      self.assertListEqual(sorted(opt_list), sorted(case['expected']))
+
+  def test_append_verifier_in_extra_opt(self):
+    extra_opt = {'matcher': SimpleMatcher()}
+    opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
+    _, value = opt_list[0].split('=', 1)
+    matcher = pickler.loads(value)
+    self.assertTrue(isinstance(matcher, BaseMatcher))
+    hc_assert_that(None, matcher)
+
+  def test_get_option(self):
+    name, value = ('job', 'mockJob')
+    test_pipeline = TestPipeline()
+    test_pipeline.options_list = ['--%s=%s' % (name, value)]
+    self.assertEqual(test_pipeline.get_option(name), value)
+
+  def test_skip_IT(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    test_pipeline.run()
+    # Note that this will never be reached since it should be skipped above.
+    self.fail()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
new file mode 100644
index 0000000..7ae27b7
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Provides TestStream for verifying streaming runner semantics."""
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+from apache_beam import coders
+from apache_beam import pvalue
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class Event(object):
+  """Test stream event to be emitted during execution of a TestStream."""
+
+  __metaclass__ = ABCMeta
+
+  def __cmp__(self, other):
+    if type(self) is not type(other):
+      return cmp(type(self), type(other))
+    return self._typed_cmp(other)
+
+  @abstractmethod
+  def _typed_cmp(self, other):
+    raise NotImplementedError
+
+
+class ElementEvent(Event):
+  """Element-producing test stream event."""
+
+  def __init__(self, timestamped_values):
+    self.timestamped_values = timestamped_values
+
+  def _typed_cmp(self, other):
+    return cmp(self.timestamped_values, other.timestamped_values)
+
+
+class WatermarkEvent(Event):
+  """Watermark-advancing test stream event."""
+
+  def __init__(self, new_watermark):
+    self.new_watermark = timestamp.Timestamp.of(new_watermark)
+
+  def _typed_cmp(self, other):
+    return cmp(self.new_watermark, other.new_watermark)
+
+
+class ProcessingTimeEvent(Event):
+  """Processing time-advancing test stream event."""
+
+  def __init__(self, advance_by):
+    self.advance_by = timestamp.Duration.of(advance_by)
+
+  def _typed_cmp(self, other):
+    return cmp(self.advance_by, other.advance_by)
+
+
+class TestStream(PTransform):
+  """Test stream that generates events on an unbounded PCollection of elements.
+
+  Each event emits elements, advances the watermark or advances the processing
+  time.  After all of the specified elements are emitted, ceases to produce
+  output.
+  """
+
+  def __init__(self, coder=coders.FastPrimitivesCoder):
+    assert coder is not None
+    self.coder = coder
+    self.current_watermark = timestamp.MIN_TIMESTAMP
+    self.events = []
+
+  def expand(self, pbegin):
+    assert isinstance(pbegin, pvalue.PBegin)
+    self.pipeline = pbegin.pipeline
+    return pvalue.PCollection(self.pipeline)
+
+  def _infer_output_coder(self, input_type=None, input_coder=None):
+    return self.coder
+
+  def _add(self, event):
+    if isinstance(event, ElementEvent):
+      for tv in event.timestamped_values:
+        assert tv.timestamp < timestamp.MAX_TIMESTAMP, (
+            'Element timestamp must be before timestamp.MAX_TIMESTAMP.')
+    elif isinstance(event, WatermarkEvent):
+      assert event.new_watermark > self.current_watermark, (
+          'Watermark must strictly-monotonically advance.')
+      self.current_watermark = event.new_watermark
+    elif isinstance(event, ProcessingTimeEvent):
+      assert event.advance_by > 0, (
+          'Must advance processing time by positive amount.')
+    else:
+      raise ValueError('Unknown event: %s' % event)
+    self.events.append(event)
+
+  def add_elements(self, elements):
+    """Add elements to the TestStream.
+
+    Elements added to the TestStream will be produced during pipeline execution.
+    These elements can be TimestampedValue, WindowedValue or raw unwrapped
+    elements that are serializable using the TestStream's specified Coder.  When
+    a TimestampedValue or a WindowedValue element is used, the timestamp of the
+    TimestampedValue or WindowedValue will be the timestamp of the produced
+    element; otherwise, the current watermark timestamp will be used for that
+    element.  The windows of a given WindowedValue are ignored by the
+    TestStream.
+    """
+    timestamped_values = []
+    for element in elements:
+      if isinstance(element, TimestampedValue):
+        timestamped_values.append(element)
+      elif isinstance(element, WindowedValue):
+        # Drop windows for elements in test stream.
+        timestamped_values.append(
+            TimestampedValue(element.value, element.timestamp))
+      else:
+        # Add elements with timestamp equal to current watermark.
+        timestamped_values.append(
+            TimestampedValue(element, self.current_watermark))
+    self._add(ElementEvent(timestamped_values))
+    return self
+
+  def advance_watermark_to(self, new_watermark):
+    """Advance the watermark to a given Unix timestamp.
+
+    The Unix timestamp value used must be later than the previous watermark
+    value and should be given as an int, float or utils.timestamp.Timestamp
+    object.
+    """
+    self._add(WatermarkEvent(new_watermark))
+    return self
+
+  def advance_watermark_to_infinity(self):
+    """Advance the watermark to the end of time."""
+    self.advance_watermark_to(timestamp.MAX_TIMESTAMP)
+    return self
+
+  def advance_processing_time(self, advance_by):
+    """Advance the current processing time by a given duration in seconds.
+
+    The duration must be a positive second duration and should be given as an
+    int, float or utils.timestamp.Duration object.
+    """
+    self._add(ProcessingTimeEvent(advance_by))
+    return self

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
new file mode 100644
index 0000000..e32dda2
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test_stream module."""
+
+import unittest
+
+from apache_beam.testing.test_stream import ElementEvent
+from apache_beam.testing.test_stream import ProcessingTimeEvent
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.test_stream import WatermarkEvent
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class TestStreamTest(unittest.TestCase):
+
+  def test_basic_test_stream(self):
+    test_stream = (TestStream()
+                   .advance_watermark_to(0)
+                   .add_elements([
+                       'a',
+                       WindowedValue('b', 3, []),
+                       TimestampedValue('c', 6)])
+                   .advance_processing_time(10)
+                   .advance_watermark_to(8)
+                   .add_elements(['d'])
+                   .advance_watermark_to_infinity())
+    self.assertEqual(
+        test_stream.events,
+        [
+            WatermarkEvent(0),
+            ElementEvent([
+                TimestampedValue('a', 0),
+                TimestampedValue('b', 3),
+                TimestampedValue('c', 6),
+            ]),
+            ProcessingTimeEvent(10),
+            WatermarkEvent(8),
+            ElementEvent([
+                TimestampedValue('d', 8),
+            ]),
+            WatermarkEvent(timestamp.MAX_TIMESTAMP),
+        ]
+    )
+
+  def test_test_stream_errors(self):
+    with self.assertRaises(AssertionError, msg=(
+        'Watermark must strictly-monotonically advance.')):
+      _ = (TestStream()
+           .advance_watermark_to(5)
+           .advance_watermark_to(4))
+
+    with self.assertRaises(AssertionError, msg=(
+        'Must advance processing time by positive amount.')):
+      _ = (TestStream()
+           .advance_processing_time(-1))
+
+    with self.assertRaises(AssertionError, msg=(
+        'Element timestamp must be before timestamp.MAX_TIMESTAMP.')):
+      _ = (TestStream()
+           .add_elements([
+               TimestampedValue('a', timestamp.MAX_TIMESTAMP)
+           ]))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
new file mode 100644
index 0000000..666207e
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility methods for testing"""
+
+import hashlib
+import imp
+from mock import Mock, patch
+
+from apache_beam.utils import retry
+
+DEFAULT_HASHING_ALG = 'sha1'
+
+
+def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG):
+  """Compute a hash value from a list of string."""
+  content.sort()
+  m = hashlib.new(hashing_alg)
+  for elem in content:
+    m.update(str(elem))
+  return m.hexdigest()
+
+
+def patch_retry(testcase, module):
+  """A function to patch retry module to use mock clock and logger.
+
+  Clock and logger that defined in retry decorator will be replaced in test
+  in order to skip sleep phase when retry happens.
+
+  Args:
+    testcase: An instance of unittest.TestCase that calls this function to
+      patch retry module.
+    module: The module that uses retry and need to be replaced with mock
+      clock and logger in test.
+  """
+  real_retry_with_exponential_backoff = retry.with_exponential_backoff
+
+  def patched_retry_with_exponential_backoff(num_retries, retry_filter):
+    """A patch for retry decorator to use a mock dummy clock and logger."""
+    return real_retry_with_exponential_backoff(
+        num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
+        clock=Mock())
+
+  patch.object(retry, 'with_exponential_backoff',
+               side_effect=patched_retry_with_exponential_backoff).start()
+
+  # Reload module after patching.
+  imp.reload(module)
+
+  def remove_patches():
+    patch.stopall()
+    # Reload module again after removing patch.
+    imp.reload(module)
+
+  testcase.addCleanup(remove_patches)

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/__init__.py b/sdks/python/apache_beam/tests/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- a/sdks/python/apache_beam/tests/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/data/privatekey.p12
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/data/privatekey.p12 b/sdks/python/apache_beam/tests/data/privatekey.p12
deleted file mode 100644
index c369ecb..0000000
Binary files a/sdks/python/apache_beam/tests/data/privatekey.p12 and /dev/null differ


[3/3] beam git commit: This closes #3031

Posted by al...@apache.org.
This closes #3031


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0cc8b9ed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0cc8b9ed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0cc8b9ed

Branch: refs/heads/release-2.0.0
Commit: 0cc8b9ed7f9e0af27a5801e5fd2a6cffd4049afe
Parents: aa8c9d1 2ecebd2
Author: Ahmet Altay <al...@google.com>
Authored: Tue May 9 20:09:17 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 9 20:09:17 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/coders/standard_coders_test.py  |   2 +-
 .../examples/complete/autocomplete_test.py      |   2 +-
 .../examples/complete/estimate_pi_test.py       |   2 +-
 .../complete/game/hourly_team_score_test.py     |   2 +-
 .../examples/complete/game/user_score_test.py   |   2 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../complete/top_wikipedia_sessions_test.py     |   2 +-
 .../cookbook/bigquery_side_input_test.py        |   2 +-
 .../cookbook/bigquery_tornadoes_it_test.py      |   4 +-
 .../cookbook/bigquery_tornadoes_test.py         |   2 +-
 .../examples/cookbook/coders_test.py            |   2 +-
 .../examples/cookbook/combiners_test.py         |   2 +-
 .../examples/cookbook/custom_ptransform_test.py |   2 +-
 .../examples/cookbook/filters_test.py           |   2 +-
 .../apache_beam/examples/snippets/snippets.py   |   2 +-
 .../examples/snippets/snippets_test.py          |   2 +-
 .../apache_beam/examples/wordcount_it_test.py   |   6 +-
 sdks/python/apache_beam/io/avroio_test.py       |   2 +-
 .../python/apache_beam/io/concat_source_test.py |   2 +-
 .../apache_beam/io/filebasedsource_test.py      |   6 +-
 sdks/python/apache_beam/io/fileio_test.py       |   2 +-
 .../io/gcp/datastore/v1/helper_test.py          |   2 +-
 .../io/gcp/tests/bigquery_matcher.py            |   2 +-
 .../io/gcp/tests/bigquery_matcher_test.py       |   2 +-
 sdks/python/apache_beam/io/sources_test.py      |   2 +-
 sdks/python/apache_beam/io/textio_test.py       |   2 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |   2 +-
 sdks/python/apache_beam/pipeline_test.py        |   2 +-
 sdks/python/apache_beam/pvalue_test.py          |   2 +-
 .../runners/dataflow/dataflow_runner_test.py    |   4 +-
 sdks/python/apache_beam/test_pipeline.py        | 163 ---------------
 sdks/python/apache_beam/test_pipeline_test.py   | 112 -----------
 sdks/python/apache_beam/testing/__init__.py     |  16 ++
 .../apache_beam/testing/data/privatekey.p12     | Bin 0 -> 2452 bytes
 .../testing/data/standard_coders.yaml           | 196 +++++++++++++++++++
 .../apache_beam/testing/pipeline_verifiers.py   | 146 ++++++++++++++
 .../testing/pipeline_verifiers_test.py          | 148 ++++++++++++++
 .../python/apache_beam/testing/test_pipeline.py | 163 +++++++++++++++
 .../apache_beam/testing/test_pipeline_test.py   | 112 +++++++++++
 sdks/python/apache_beam/testing/test_stream.py  | 163 +++++++++++++++
 .../apache_beam/testing/test_stream_test.py     |  83 ++++++++
 sdks/python/apache_beam/testing/test_utils.py   |  69 +++++++
 sdks/python/apache_beam/tests/__init__.py       |  16 --
 .../apache_beam/tests/data/privatekey.p12       | Bin 2452 -> 0 bytes
 .../apache_beam/tests/data/standard_coders.yaml | 196 -------------------
 .../apache_beam/tests/pipeline_verifiers.py     | 146 --------------
 .../tests/pipeline_verifiers_test.py            | 148 --------------
 sdks/python/apache_beam/tests/test_utils.py     |  69 -------
 .../apache_beam/transforms/combiners_test.py    |   2 +-
 .../apache_beam/transforms/create_test.py       |   2 +-
 .../apache_beam/transforms/ptransform_test.py   |   2 +-
 .../apache_beam/transforms/sideinputs_test.py   |   2 +-
 .../apache_beam/transforms/trigger_test.py      |   2 +-
 sdks/python/apache_beam/transforms/util_test.py |   2 +-
 .../apache_beam/transforms/window_test.py       |   2 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |   4 +-
 sdks/python/apache_beam/utils/test_stream.py    | 163 ---------------
 .../apache_beam/utils/test_stream_test.py       |  83 --------
 sdks/python/setup.py                            |   2 +-
 60 files changed, 1143 insertions(+), 1143 deletions(-)
----------------------------------------------------------------------