You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/10 03:09:30 UTC
[2/3] beam git commit: [BEAM-2236] Cherry pick #3017 - Move test
utilities out of python core
[BEAM-2236] Cherry pick #3017 - Move test utilities out of python core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ecebd22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ecebd22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ecebd22
Branch: refs/heads/release-2.0.0
Commit: 2ecebd22e3071d0ba6d3647273367d16582ca852
Parents: aa8c9d1
Author: Mark Liu <ma...@google.com>
Authored: Tue May 9 16:41:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 9 20:08:55 2017 -0700
----------------------------------------------------------------------
.../apache_beam/coders/standard_coders_test.py | 2 +-
.../examples/complete/autocomplete_test.py | 2 +-
.../examples/complete/estimate_pi_test.py | 2 +-
.../complete/game/hourly_team_score_test.py | 2 +-
.../examples/complete/game/user_score_test.py | 2 +-
.../apache_beam/examples/complete/tfidf_test.py | 2 +-
.../complete/top_wikipedia_sessions_test.py | 2 +-
.../cookbook/bigquery_side_input_test.py | 2 +-
.../cookbook/bigquery_tornadoes_it_test.py | 4 +-
.../cookbook/bigquery_tornadoes_test.py | 2 +-
.../examples/cookbook/coders_test.py | 2 +-
.../examples/cookbook/combiners_test.py | 2 +-
.../examples/cookbook/custom_ptransform_test.py | 2 +-
.../examples/cookbook/filters_test.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 2 +-
.../examples/snippets/snippets_test.py | 2 +-
.../apache_beam/examples/wordcount_it_test.py | 6 +-
sdks/python/apache_beam/io/avroio_test.py | 2 +-
.../python/apache_beam/io/concat_source_test.py | 2 +-
.../apache_beam/io/filebasedsource_test.py | 6 +-
sdks/python/apache_beam/io/fileio_test.py | 2 +-
.../io/gcp/datastore/v1/helper_test.py | 2 +-
.../io/gcp/tests/bigquery_matcher.py | 2 +-
.../io/gcp/tests/bigquery_matcher_test.py | 2 +-
sdks/python/apache_beam/io/sources_test.py | 2 +-
sdks/python/apache_beam/io/textio_test.py | 2 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 2 +-
sdks/python/apache_beam/pipeline_test.py | 2 +-
sdks/python/apache_beam/pvalue_test.py | 2 +-
.../runners/dataflow/dataflow_runner_test.py | 4 +-
sdks/python/apache_beam/test_pipeline.py | 163 ---------------
sdks/python/apache_beam/test_pipeline_test.py | 112 -----------
sdks/python/apache_beam/testing/__init__.py | 16 ++
.../apache_beam/testing/data/privatekey.p12 | Bin 0 -> 2452 bytes
.../testing/data/standard_coders.yaml | 196 +++++++++++++++++++
.../apache_beam/testing/pipeline_verifiers.py | 146 ++++++++++++++
.../testing/pipeline_verifiers_test.py | 148 ++++++++++++++
.../python/apache_beam/testing/test_pipeline.py | 163 +++++++++++++++
.../apache_beam/testing/test_pipeline_test.py | 112 +++++++++++
sdks/python/apache_beam/testing/test_stream.py | 163 +++++++++++++++
.../apache_beam/testing/test_stream_test.py | 83 ++++++++
sdks/python/apache_beam/testing/test_utils.py | 69 +++++++
sdks/python/apache_beam/tests/__init__.py | 16 --
.../apache_beam/tests/data/privatekey.p12 | Bin 2452 -> 0 bytes
.../apache_beam/tests/data/standard_coders.yaml | 196 -------------------
.../apache_beam/tests/pipeline_verifiers.py | 146 --------------
.../tests/pipeline_verifiers_test.py | 148 --------------
sdks/python/apache_beam/tests/test_utils.py | 69 -------
.../apache_beam/transforms/combiners_test.py | 2 +-
.../apache_beam/transforms/create_test.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 2 +-
.../apache_beam/transforms/sideinputs_test.py | 2 +-
.../apache_beam/transforms/trigger_test.py | 2 +-
sdks/python/apache_beam/transforms/util_test.py | 2 +-
.../apache_beam/transforms/window_test.py | 2 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 4 +-
sdks/python/apache_beam/utils/test_stream.py | 163 ---------------
.../apache_beam/utils/test_stream_test.py | 83 --------
sdks/python/setup.py | 2 +-
60 files changed, 1143 insertions(+), 1143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 4a48ed9..885e88f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -34,7 +34,7 @@ from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms import window
STANDARD_CODERS_YAML = os.path.join(
- os.path.dirname(__file__), '..', 'tests', 'data', 'standard_coders.yaml')
+ os.path.dirname(__file__), '..', 'testing', 'data', 'standard_coders.yaml')
def _load_test_cases(test_yaml):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index d59d0f5..438633a 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,7 +21,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import autocomplete
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index dc5b901..12d8379 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -21,7 +21,7 @@ import logging
import unittest
from apache_beam.examples.complete import estimate_pi
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import BeamAssertException
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
index 0eaa8c6..bd0abca 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
@@ -21,8 +21,8 @@ import logging
import unittest
import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
from apache_beam.examples.complete.game import hourly_team_score
+from apache_beam.testing.test_pipeline import TestPipeline
class HourlyTeamScoreTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/game/user_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
index 750729d..2db53bd 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
@@ -21,8 +21,8 @@ import logging
import unittest
import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
from apache_beam.examples.complete.game import user_score
+from apache_beam.testing.test_pipeline import TestPipeline
class UserScoreTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 05e53a4..0e30254 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -25,7 +25,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import tfidf
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
EXPECTED_RESULTS = set([
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 9b9d9b1..4850c04 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -23,7 +23,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import top_wikipedia_sessions
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
class ComputeTopSessionsTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 5869976..1ca25c9 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -22,7 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_side_input
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
class BigQuerySideInputTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index 3e302d1..5d2ee7c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -26,8 +26,8 @@ from nose.plugins.attrib import attr
from apache_beam.examples.cookbook import bigquery_tornadoes
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
class BigqueryTornadoesIT(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 0c66d7e..ca7ca9e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -22,7 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_tornadoes
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
class BigQueryTornadoesTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 4a92abb..35cf252 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -22,7 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import coders
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index a8ed555..45c779f 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -27,7 +27,7 @@ import logging
import unittest
import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
class CombinersTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index cd1c04a..2d35d8d 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -22,7 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import custom_ptransform
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 28bb1e1..44a352f 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -22,7 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import filters
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
class FiltersTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 26af71d..1bdb9a3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -31,8 +31,8 @@ string. The tags can contain only letters, digits and _.
"""
import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
from apache_beam.metrics import Metrics
+from apache_beam.testing.test_pipeline import TestPipeline
# Quiet some pylint warnings that happen because of the somewhat special
# format for the code snippets.
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index da0a962..85d8bde 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -36,7 +36,7 @@ from apache_beam.examples.snippets import snippets
from apache_beam.utils.windowed_value import WindowedValue
# pylint: disable=expression-not-assigned
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 54e54e8..4bee127 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -25,9 +25,9 @@ from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
from apache_beam.examples import wordcount
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
-from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher
+from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
class WordCountIT(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 5f2db62..4a21839 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -26,7 +26,7 @@ from apache_beam.io import iobase
from apache_beam.io import avroio
from apache_beam.io import filebasedsource
from apache_beam.io import source_test_utils
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.util import assert_that
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 807c3fd..a02f9ad 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -26,7 +26,7 @@ from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io import source_test_utils
from apache_beam.io.concat_source import ConcatSource
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 4ff23fc..e17a004 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -38,13 +38,13 @@ from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
from apache_beam.io.filebasedsource import FileBasedSource
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import RuntimeValueProvider
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
-from apache_beam.options.value_provider import StaticValueProvider
-from apache_beam.options.value_provider import RuntimeValueProvider
class LineSource(FileBasedSource):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index e0e9774..4c25505 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -31,7 +31,7 @@ import mock
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import fileio
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 5d4bb6f..a804c09 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -23,7 +23,7 @@ from mock import MagicMock
from apache_beam.io.gcp.datastore.v1 import fake_datastore
from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.tests.test_utils import patch_retry
+from apache_beam.testing.test_utils import patch_retry
# Protect against environments where apitools library is not available.
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 66d99b3..f42b70f 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -21,7 +21,7 @@ import logging
from hamcrest.core.base_matcher import BaseMatcher
-from apache_beam.tests.test_utils import compute_hash
+from apache_beam.testing.test_utils import compute_hash
from apache_beam.utils import retry
# Protect against environments where bigquery library is not available.
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index d8aa148..f12293e 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -24,7 +24,7 @@ from hamcrest import assert_that as hc_assert_that
from mock import Mock, patch
from apache_beam.io.gcp.tests import bigquery_matcher as bq_verifier
-from apache_beam.tests.test_utils import patch_retry
+from apache_beam.testing.test_utils import patch_retry
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 3f92756..c0b8ad6 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -27,7 +27,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import iobase
from apache_beam.io import range_trackers
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 90dc665..d00afef 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -43,7 +43,7 @@ from apache_beam.io.filebasedsource_test import write_data
from apache_beam.io.filebasedsource_test import write_pattern
from apache_beam.io.filesystem import CompressionTypes
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index 29a9fb8..b7e370d 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -35,7 +35,7 @@ from apache_beam.io.tfrecordio import _TFRecordSource
from apache_beam.io.tfrecordio import _TFRecordUtil
from apache_beam.io.tfrecordio import ReadFromTFRecord
from apache_beam.io.tfrecordio import WriteToTFRecord
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
import crcmod
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index ebcc43b..c6b1e48 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -32,7 +32,7 @@ from apache_beam.pipeline import PipelineOptions
from apache_beam.pipeline import PipelineVisitor
from apache_beam.pvalue import AsSingleton
from apache_beam.runners.dataflow.native_io.iobase import NativeSource
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms import CombineGlobally
from apache_beam.transforms import Create
from apache_beam.transforms import FlatMap
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index 529ddf7..4acbc52 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -20,7 +20,7 @@
import unittest
from apache_beam.pvalue import PValue
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
class PValueTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index a9f61a7..b61a683 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -26,6 +26,7 @@ import mock
import apache_beam as beam
import apache_beam.transforms as ptransform
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline, AppliedPTransform
from apache_beam.pvalue import PCollection
from apache_beam.runners import create_runner
@@ -34,10 +35,9 @@ from apache_beam.runners import TestDataflowRunner
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.typehints import typehints
-from apache_beam.options.pipeline_options import PipelineOptions
# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
deleted file mode 100644
index 20f4839..0000000
--- a/sdks/python/apache_beam/test_pipeline.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Test Pipeline, a wrapper of Pipeline for test purpose"""
-
-import argparse
-import shlex
-
-from apache_beam.internal import pickler
-from apache_beam.pipeline import Pipeline
-from apache_beam.runners.runner import PipelineState
-from apache_beam.options.pipeline_options import PipelineOptions
-from nose.plugins.skip import SkipTest
-
-
-class TestPipeline(Pipeline):
- """TestPipeline class is used inside of Beam tests that can be configured to
- run against pipeline runner.
-
- It has a functionality to parse arguments from command line and build pipeline
- options for tests who runs against a pipeline runner and utilizes resources
- of the pipeline runner. Those test functions are recommended to be tagged by
- @attr("ValidatesRunner") annotation.
-
- In order to configure the test with customized pipeline options from command
- line, system argument 'test-pipeline-options' can be used to obtains a list
- of pipeline options. If no options specified, default value will be used.
-
- For example, use following command line to execute all ValidatesRunner tests::
-
- python setup.py nosetests -a ValidatesRunner \
- --test-pipeline-options="--runner=DirectRunner \
- --job_name=myJobName \
- --num_workers=1"
-
- For example, use assert_that for test validation::
-
- pipeline = TestPipeline()
- pcoll = ...
- assert_that(pcoll, equal_to(...))
- pipeline.run()
- """
-
- def __init__(self,
- runner=None,
- options=None,
- argv=None,
- is_integration_test=False,
- blocking=True):
- """Initialize a pipeline object for test.
-
- Args:
- runner: An object of type 'PipelineRunner' that will be used to execute
- the pipeline. For registered runners, the runner name can be specified,
- otherwise a runner object must be supplied.
- options: A configured 'PipelineOptions' object containing arguments
- that should be used for running the pipeline job.
- argv: A list of arguments (such as sys.argv) to be used for building a
- 'PipelineOptions' object. This will only be used if argument 'options'
- is None.
- is_integration_test: True if the test is an integration test, False
- otherwise.
- blocking: Run method will wait until pipeline execution is completed.
-
- Raises:
- ValueError: if either the runner or options argument is not of the
- expected type.
- """
- self.is_integration_test = is_integration_test
- self.options_list = self._parse_test_option_args(argv)
- self.blocking = blocking
- if options is None:
- options = PipelineOptions(self.options_list)
- super(TestPipeline, self).__init__(runner, options)
-
- def run(self):
- result = super(TestPipeline, self).run()
- if self.blocking:
- state = result.wait_until_finish()
- assert state == PipelineState.DONE, "Pipeline execution failed."
-
- return result
-
- def _parse_test_option_args(self, argv):
- """Parse value of command line argument: --test-pipeline-options to get
- pipeline options.
-
- Args:
- argv: An iterable of command line arguments to be used. If not specified
- then sys.argv will be used as input for parsing arguments.
-
- Returns:
- An argument list of options that can be parsed by argparser or directly
- build a pipeline option.
- """
- parser = argparse.ArgumentParser()
- parser.add_argument('--test-pipeline-options',
- type=str,
- action='store',
- help='only run tests providing service options')
- known, unused_argv = parser.parse_known_args(argv)
-
- if self.is_integration_test and not known.test_pipeline_options:
- # Skip integration test when argument '--test-pipeline-options' is not
- # specified since nose calls integration tests when runs unit test by
- # 'setup.py test'.
- raise SkipTest('IT is skipped because --test-pipeline-options '
- 'is not specified')
-
- return shlex.split(known.test_pipeline_options) \
- if known.test_pipeline_options else []
-
- def get_full_options_as_args(self, **extra_opts):
- """Get full pipeline options as an argument list.
-
- Append extra pipeline options to existing option list if provided.
- Test verifier (if contains in extra options) should be pickled before
- appending, and will be unpickled later in the TestRunner.
- """
- options = list(self.options_list)
- for k, v in extra_opts.items():
- if not v:
- continue
- elif isinstance(v, bool) and v:
- options.append('--%s' % k)
- elif 'matcher' in k:
- options.append('--%s=%s' % (k, pickler.dumps(v)))
- else:
- options.append('--%s=%s' % (k, v))
- return options
-
- def get_option(self, opt_name):
- """Get a pipeline option value by name
-
- Args:
- opt_name: The name of the pipeline option.
-
- Returns:
- None if option is not found in existing option list which is generated
- by parsing value of argument `test-pipeline-options`.
- """
- parser = argparse.ArgumentParser()
- opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name
- # Option name should start with '--' when it's used for parsing.
- parser.add_argument('--' + opt_name,
- type=str,
- action='store')
- known, _ = parser.parse_known_args(self.options_list)
- return getattr(known, opt_name) if hasattr(known, opt_name) else None
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
deleted file mode 100644
index 325cab7..0000000
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ /dev/null
@@ -1,112 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit test for the TestPipeline class"""
-
-import logging
-import unittest
-
-from hamcrest.core.base_matcher import BaseMatcher
-from hamcrest.core.assert_that import assert_that as hc_assert_that
-
-from apache_beam.internal import pickler
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.options.pipeline_options import PipelineOptions
-
-
-# A simple matcher that is ued for testing extra options appending.
-class SimpleMatcher(BaseMatcher):
- def _matches(self, item):
- return True
-
-
-class TestPipelineTest(unittest.TestCase):
-
- TEST_CASE = {'options':
- ['--test-pipeline-options', '--job=mockJob --male --age=1'],
- 'expected_list': ['--job=mockJob', '--male', '--age=1'],
- 'expected_dict': {'job': 'mockJob',
- 'male': True,
- 'age': 1}}
-
- # Used for testing pipeline option creation.
- class TestParsingOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--job', action='store', help='mock job')
- parser.add_argument('--male', action='store_true', help='mock gender')
- parser.add_argument('--age', action='store', type=int, help='mock age')
-
- def test_option_args_parsing(self):
- test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
- self.assertListEqual(
- sorted(test_pipeline.get_full_options_as_args()),
- sorted(self.TEST_CASE['expected_list']))
-
- def test_empty_option_args_parsing(self):
- test_pipeline = TestPipeline()
- self.assertListEqual([],
- test_pipeline.get_full_options_as_args())
-
- def test_create_test_pipeline_options(self):
- test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
- test_options = PipelineOptions(test_pipeline.get_full_options_as_args())
- self.assertDictContainsSubset(self.TEST_CASE['expected_dict'],
- test_options.get_all_options())
-
- EXTRA_OPT_CASES = [
- {'options': {'name': 'Mark'},
- 'expected': ['--name=Mark']},
- {'options': {'student': True},
- 'expected': ['--student']},
- {'options': {'student': False},
- 'expected': []},
- {'options': {'name': 'Mark', 'student': True},
- 'expected': ['--name=Mark', '--student']}
- ]
-
- def test_append_extra_options(self):
- test_pipeline = TestPipeline()
- for case in self.EXTRA_OPT_CASES:
- opt_list = test_pipeline.get_full_options_as_args(**case['options'])
- self.assertListEqual(sorted(opt_list), sorted(case['expected']))
-
- def test_append_verifier_in_extra_opt(self):
- extra_opt = {'matcher': SimpleMatcher()}
- opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
- _, value = opt_list[0].split('=', 1)
- matcher = pickler.loads(value)
- self.assertTrue(isinstance(matcher, BaseMatcher))
- hc_assert_that(None, matcher)
-
- def test_get_option(self):
- name, value = ('job', 'mockJob')
- test_pipeline = TestPipeline()
- test_pipeline.options_list = ['--%s=%s' % (name, value)]
- self.assertEqual(test_pipeline.get_option(name), value)
-
- def test_skip_IT(self):
- test_pipeline = TestPipeline(is_integration_test=True)
- test_pipeline.run()
- # Note that this will never be reached since it should be skipped above.
- self.fail()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/__init__.py b/sdks/python/apache_beam/testing/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/testing/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/data/privatekey.p12
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/data/privatekey.p12 b/sdks/python/apache_beam/testing/data/privatekey.p12
new file mode 100644
index 0000000..c369ecb
Binary files /dev/null and b/sdks/python/apache_beam/testing/data/privatekey.p12 differ
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/data/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/data/standard_coders.yaml b/sdks/python/apache_beam/testing/data/standard_coders.yaml
new file mode 100644
index 0000000..790cacb
--- /dev/null
+++ b/sdks/python/apache_beam/testing/data/standard_coders.yaml
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is broken into multiple sections delimited by ---. Each section specifies a set of
+# reference encodings for a single standardized coder used in a specific context.
+#
+# Each section contains up to 3 properties:
+#
+# coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
+# nested: a boolean meaning whether the coder was used in the nested context. Missing means to
+# test both contexts, a shorthand for when the coder is invariant across context.
+# examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context.
+# The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is
+# one of a few standard JSON types such as numbers, strings, dicts that map naturally
+# to the type encoded by the coder.
+#
+# These choices were made to strike a balance between portability, ease of use, and simple
+# legibility of this file itself.
+#
+# It is expected that future work will move the `coder` field into a format that it would be
+# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
+#
+# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated.
+
+
+
+coder:
+ urn: "urn:beam:coders:bytes:0.1"
+nested: false
+examples:
+ "abc": abc
+ "ab\0c": "ab\0c"
+
+---
+
+coder:
+ urn: "urn:beam:coders:bytes:0.1"
+nested: true
+examples:
+ "\u0003abc": abc
+ "\u0004ab\0c": "ab\0c"
+ "\u00c8\u0001 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|":
+ " 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|"
+
+---
+
+coder:
+ urn: "urn:beam:coders:varint:0.1"
+examples:
+ "\0": 0
+ "\u0001": 1
+ "\u000A": 10
+ "\u00c8\u0001": 200
+ "\u00e8\u0007": 1000
+ "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
+
+---
+
+coder:
+ urn: "urn:beam:coders:kv:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"},
+ {urn: "urn:beam:coders:varint:0.1"}]
+examples:
+ "\u0003abc\0": {key: abc, value: 0}
+ "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
+
+---
+
+coder:
+ urn: "urn:beam:coders:kv:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"},
+ {urn: "urn:beam:coders:bytes:0.1"}]
+nested: false
+examples:
+ "\u0003abcdef": {key: abc, value: def}
+ "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+ urn: "urn:beam:coders:kv:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"},
+ {urn: "urn:beam:coders:bytes:0.1"}]
+nested: true
+examples:
+ "\u0003abc\u0003def": {key: abc, value: def}
+ "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+ urn: "urn:beam:coders:interval_window:0.1"
+examples:
+ "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
+ "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
+ "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365}
+ "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:varint:0.1"}]
+examples:
+ "\0\0\0\u0001\0": [0]
+ "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
+ "\0\0\0\0": []
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"}]
+examples:
+ "\0\0\0\u0001\u0003abc": ["abc"]
+ "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
+ "\0\0\0\0": []
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:bytes:0.1"}]
+ # This is for iterables of unknown length, where the encoding is not
+ # deterministic.
+ non_deterministic: True
+examples:
+ "\u00ff\u00ff\u00ff\u00ff\u0000": []
+ "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
+ "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
+
+---
+
+coder:
+ urn: "urn:beam:coders:stream:0.1"
+ components: [{urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+ "\0\0\0\u0001": [""]
+
+---
+
+coder:
+ urn: "urn:beam:coders:global_window:0.1"
+examples:
+ "": ""
+
+---
+
+# All windowed values consist of pane infos that represent NO_FIRING until full support is added
+# in the Python SDK (BEAM-1522).
+coder:
+ urn: "urn:beam:coders:windowed_value:0.1"
+ components: [{urn: "urn:beam:coders:varint:0.1"},
+ {urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+ "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
+ value: 2,
+ timestamp: 1454293425000,
+ pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+ windows: ["global"]
+ }
+
+---
+
+coder:
+ urn: "urn:beam:coders:windowed_value:0.1"
+ components: [{urn: "urn:beam:coders:varint:0.1"},
+ {urn: "urn:beam:coders:interval_window:0.1"}]
+examples:
+ "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
+ value: 4,
+ timestamp: -400000,
+ pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+ windows: [{end: 1454293425000, span: 280000}]
+ }
+
+ "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002": {
+ value: 2,
+ timestamp: -100,
+ pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+ windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
+ }
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py
new file mode 100644
index 0000000..5a6082a
--- /dev/null
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test result verifiers
+
+A set of verifiers that are used in end-to-end tests to verify state/output
+of test pipeline job. Customized verifier should extend
+`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
+"""
+
+import logging
+import time
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils as utils
+from apache_beam.utils import retry
+
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+
+MAX_RETRIES = 4
+
+
+class PipelineStateMatcher(BaseMatcher):
+ """Matcher that verify pipeline job terminated in expected state
+
+ Matcher compares the actual pipeline terminate state with expected.
+ By default, `PipelineState.DONE` is used as expected state.
+ """
+
+ def __init__(self, expected_state=PipelineState.DONE):
+ self.expected_state = expected_state
+
+ def _matches(self, pipeline_result):
+ return pipeline_result.state == self.expected_state
+
+ def describe_to(self, description):
+ description \
+ .append_text("Test pipeline expected terminated in state: ") \
+ .append_text(self.expected_state)
+
+ def describe_mismatch(self, pipeline_result, mismatch_description):
+ mismatch_description \
+ .append_text("Test pipeline job terminated in state: ") \
+ .append_text(pipeline_result.state)
+
+
+def retry_on_io_error_and_server_error(exception):
+ """Filter allowing retries on file I/O errors and service error."""
+ return isinstance(exception, IOError) or \
+ (HttpError is not None and isinstance(exception, HttpError))
+
+
+class FileChecksumMatcher(BaseMatcher):
+ """Matcher that verifies file(s) content by comparing file checksum.
+
+ Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
+ is a hash string computed from content of file(s).
+ """
+
+ def __init__(self, file_path, expected_checksum, sleep_secs=None):
+ """Initialize a FileChecksumMatcher object
+
+ Args:
+ file_path : A string that is the full path of output file. This path
+ can contain globs.
+ expected_checksum : A hash string that is computed from expected
+ result.
+ sleep_secs : Number of seconds to wait before verification start.
+ Extra time are given to make sure output files are ready on FS.
+ """
+ if sleep_secs is not None:
+ if isinstance(sleep_secs, int):
+ self.sleep_secs = sleep_secs
+ else:
+ raise ValueError('Sleep seconds, if received, must be int. '
+ 'But received: %r, %s' % (sleep_secs,
+ type(sleep_secs)))
+ else:
+ self.sleep_secs = None
+
+ self.file_path = file_path
+ self.expected_checksum = expected_checksum
+
+ @retry.with_exponential_backoff(
+ num_retries=MAX_RETRIES,
+ retry_filter=retry_on_io_error_and_server_error)
+ def _read_with_retry(self):
+ """Read path with retry if I/O failed"""
+ read_lines = []
+ match_result = FileSystems.match([self.file_path])[0]
+ matched_path = [f.path for f in match_result.metadata_list]
+ if not matched_path:
+ raise IOError('No such file or directory: %s' % self.file_path)
+
+ logging.info('Find %d files in %s: \n%s',
+ len(matched_path), self.file_path, '\n'.join(matched_path))
+ for path in matched_path:
+ with FileSystems.open(path, 'r') as f:
+ for line in f:
+ read_lines.append(line)
+ return read_lines
+
+ def _matches(self, _):
+ if self.sleep_secs:
+ # Wait to have output file ready on FS
+ logging.info('Wait %d seconds...', self.sleep_secs)
+ time.sleep(self.sleep_secs)
+
+ # Read from given file(s) path
+ read_lines = self._read_with_retry()
+
+ # Compute checksum
+ self.checksum = utils.compute_hash(read_lines)
+ logging.info('Read from given path %s, %d lines, checksum: %s.',
+ self.file_path, len(read_lines), self.checksum)
+ return self.checksum == self.expected_checksum
+
+ def describe_to(self, description):
+ description \
+ .append_text("Expected checksum is ") \
+ .append_text(self.expected_checksum)
+
+ def describe_mismatch(self, pipeline_result, mismatch_description):
+ mismatch_description \
+ .append_text("Actual checksum is ") \
+ .append_text(self.checksum)
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers_test.py b/sdks/python/apache_beam/testing/pipeline_verifiers_test.py
new file mode 100644
index 0000000..15e0a04
--- /dev/null
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers_test.py
@@ -0,0 +1,148 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test pipeline verifiers"""
+
+import logging
+import tempfile
+import unittest
+
+from hamcrest import assert_that as hc_assert_that
+from mock import Mock, patch
+
+from apache_beam.io.localfilesystem import LocalFileSystem
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_utils import patch_retry
+from apache_beam.testing import pipeline_verifiers as verifiers
+
+try:
+ # pylint: disable=wrong-import-order, wrong-import-position
+ # pylint: disable=ungrouped-imports
+ from apitools.base.py.exceptions import HttpError
+ from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+ HttpError = None
+ GCSFileSystem = None
+
+
+class PipelineVerifiersTest(unittest.TestCase):
+
+ def setUp(self):
+ self._mock_result = Mock()
+ patch_retry(self, verifiers)
+
+ def test_pipeline_state_matcher_success(self):
+ """Test PipelineStateMatcher successes when using default expected state
+ and job actually finished in DONE
+ """
+ pipeline_result = PipelineResult(PipelineState.DONE)
+ hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
+
+ def test_pipeline_state_matcher_given_state(self):
+ """Test PipelineStateMatcher successes when matches given state"""
+ pipeline_result = PipelineResult(PipelineState.FAILED)
+ hc_assert_that(pipeline_result,
+ verifiers.PipelineStateMatcher(PipelineState.FAILED))
+
+ def test_pipeline_state_matcher_fails(self):
+ """Test PipelineStateMatcher fails when using default expected state
+ and job actually finished in CANCELLED/DRAINED/FAILED/STOPPED/UNKNOWN
+ """
+ failed_state = [PipelineState.CANCELLED,
+ PipelineState.DRAINED,
+ PipelineState.FAILED,
+ PipelineState.STOPPED,
+ PipelineState.UNKNOWN]
+
+ for state in failed_state:
+ pipeline_result = PipelineResult(state)
+ with self.assertRaises(AssertionError):
+ hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
+
+ test_cases = [
+ {'content': 'Test FileChecksumMatcher with single file',
+ 'num_files': 1,
+ 'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'},
+ {'content': 'Test FileChecksumMatcher with multiple files',
+ 'num_files': 3,
+ 'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'},
+ {'content': '',
+ 'num_files': 1,
+ 'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'},
+ ]
+
+ def create_temp_file(self, content, directory=None):
+ with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f:
+ f.write(content)
+ return f.name
+
+ def test_file_checksum_matcher_success(self):
+ for case in self.test_cases:
+ temp_dir = tempfile.mkdtemp()
+ for _ in range(case['num_files']):
+ self.create_temp_file(case['content'], temp_dir)
+ matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+ case['expected_checksum'])
+ hc_assert_that(self._mock_result, matcher)
+
+ @patch.object(LocalFileSystem, 'match')
+ def test_file_checksum_matcher_read_failed(self, mock_match):
+ mock_match.side_effect = IOError('No file found.')
+ matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
+ with self.assertRaises(IOError):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mock_match.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
+
+ @patch.object(GCSFileSystem, 'match')
+ @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
+ def test_file_checksum_matcher_service_error(self, mock_match):
+ mock_match.side_effect = HttpError(
+ response={'status': '404'}, url='', content='Not Found',
+ )
+ matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
+ with self.assertRaises(HttpError):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mock_match.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
+
+ def test_file_checksum_matchcer_invalid_sleep_time(self):
+ with self.assertRaises(ValueError) as cm:
+ verifiers.FileChecksumMatcher('file_path',
+ 'expected_checksum',
+ 'invalid_sleep_time')
+ self.assertEqual(cm.exception.message,
+ 'Sleep seconds, if received, must be int. '
+ 'But received: \'invalid_sleep_time\', '
+ '<type \'str\'>')
+
+ @patch('time.sleep', return_value=None)
+ def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep):
+ temp_dir = tempfile.mkdtemp()
+ case = self.test_cases[0]
+ self.create_temp_file(case['content'], temp_dir)
+ matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+ case['expected_checksum'],
+ 10)
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mocked_sleep.called)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
new file mode 100644
index 0000000..20f4839
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test Pipeline, a wrapper of Pipeline for test purpose"""
+
+import argparse
+import shlex
+
+from apache_beam.internal import pickler
+from apache_beam.pipeline import Pipeline
+from apache_beam.runners.runner import PipelineState
+from apache_beam.options.pipeline_options import PipelineOptions
+from nose.plugins.skip import SkipTest
+
+
+class TestPipeline(Pipeline):
+ """TestPipeline class is used inside of Beam tests that can be configured to
+ run against pipeline runner.
+
+ It has a functionality to parse arguments from command line and build pipeline
+ options for tests who runs against a pipeline runner and utilizes resources
+ of the pipeline runner. Those test functions are recommended to be tagged by
+ @attr("ValidatesRunner") annotation.
+
+ In order to configure the test with customized pipeline options from command
+ line, system argument 'test-pipeline-options' can be used to obtains a list
+ of pipeline options. If no options specified, default value will be used.
+
+ For example, use following command line to execute all ValidatesRunner tests::
+
+ python setup.py nosetests -a ValidatesRunner \
+ --test-pipeline-options="--runner=DirectRunner \
+ --job_name=myJobName \
+ --num_workers=1"
+
+ For example, use assert_that for test validation::
+
+ pipeline = TestPipeline()
+ pcoll = ...
+ assert_that(pcoll, equal_to(...))
+ pipeline.run()
+ """
+
+ def __init__(self,
+ runner=None,
+ options=None,
+ argv=None,
+ is_integration_test=False,
+ blocking=True):
+ """Initialize a pipeline object for test.
+
+ Args:
+ runner: An object of type 'PipelineRunner' that will be used to execute
+ the pipeline. For registered runners, the runner name can be specified,
+ otherwise a runner object must be supplied.
+ options: A configured 'PipelineOptions' object containing arguments
+ that should be used for running the pipeline job.
+ argv: A list of arguments (such as sys.argv) to be used for building a
+ 'PipelineOptions' object. This will only be used if argument 'options'
+ is None.
+ is_integration_test: True if the test is an integration test, False
+ otherwise.
+ blocking: Run method will wait until pipeline execution is completed.
+
+ Raises:
+ ValueError: if either the runner or options argument is not of the
+ expected type.
+ """
+ self.is_integration_test = is_integration_test
+ self.options_list = self._parse_test_option_args(argv)
+ self.blocking = blocking
+ if options is None:
+ options = PipelineOptions(self.options_list)
+ super(TestPipeline, self).__init__(runner, options)
+
+ def run(self):
+ result = super(TestPipeline, self).run()
+ if self.blocking:
+ state = result.wait_until_finish()
+ assert state == PipelineState.DONE, "Pipeline execution failed."
+
+ return result
+
+ def _parse_test_option_args(self, argv):
+ """Parse value of command line argument: --test-pipeline-options to get
+ pipeline options.
+
+ Args:
+ argv: An iterable of command line arguments to be used. If not specified
+ then sys.argv will be used as input for parsing arguments.
+
+ Returns:
+ An argument list of options that can be parsed by argparser or directly
+ build a pipeline option.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--test-pipeline-options',
+ type=str,
+ action='store',
+ help='only run tests providing service options')
+ known, unused_argv = parser.parse_known_args(argv)
+
+ if self.is_integration_test and not known.test_pipeline_options:
+ # Skip integration test when argument '--test-pipeline-options' is not
+ # specified since nose calls integration tests when runs unit test by
+ # 'setup.py test'.
+ raise SkipTest('IT is skipped because --test-pipeline-options '
+ 'is not specified')
+
+ return shlex.split(known.test_pipeline_options) \
+ if known.test_pipeline_options else []
+
+ def get_full_options_as_args(self, **extra_opts):
+ """Get full pipeline options as an argument list.
+
+ Append extra pipeline options to existing option list if provided.
+ Test verifier (if contains in extra options) should be pickled before
+ appending, and will be unpickled later in the TestRunner.
+ """
+ options = list(self.options_list)
+ for k, v in extra_opts.items():
+ if not v:
+ continue
+ elif isinstance(v, bool) and v:
+ options.append('--%s' % k)
+ elif 'matcher' in k:
+ options.append('--%s=%s' % (k, pickler.dumps(v)))
+ else:
+ options.append('--%s=%s' % (k, v))
+ return options
+
+ def get_option(self, opt_name):
+ """Get a pipeline option value by name
+
+ Args:
+ opt_name: The name of the pipeline option.
+
+ Returns:
+ None if option is not found in existing option list which is generated
+ by parsing value of argument `test-pipeline-options`.
+ """
+ parser = argparse.ArgumentParser()
+ opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name
+ # Option name should start with '--' when it's used for parsing.
+ parser.add_argument('--' + opt_name,
+ type=str,
+ action='store')
+ known, _ = parser.parse_known_args(self.options_list)
+ return getattr(known, opt_name) if hasattr(known, opt_name) else None
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline_test.py b/sdks/python/apache_beam/testing/test_pipeline_test.py
new file mode 100644
index 0000000..747d64c7
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_pipeline_test.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit test for the TestPipeline class"""
+
+import logging
+import unittest
+
+from hamcrest.core.base_matcher import BaseMatcher
+from hamcrest.core.assert_that import assert_that as hc_assert_that
+
+from apache_beam.internal import pickler
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+# A simple matcher that is ued for testing extra options appending.
+class SimpleMatcher(BaseMatcher):
+ def _matches(self, item):
+ return True
+
+
+class TestPipelineTest(unittest.TestCase):
+
+ TEST_CASE = {'options':
+ ['--test-pipeline-options', '--job=mockJob --male --age=1'],
+ 'expected_list': ['--job=mockJob', '--male', '--age=1'],
+ 'expected_dict': {'job': 'mockJob',
+ 'male': True,
+ 'age': 1}}
+
+ # Used for testing pipeline option creation.
+ class TestParsingOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--job', action='store', help='mock job')
+ parser.add_argument('--male', action='store_true', help='mock gender')
+ parser.add_argument('--age', action='store', type=int, help='mock age')
+
+ def test_option_args_parsing(self):
+ test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
+ self.assertListEqual(
+ sorted(test_pipeline.get_full_options_as_args()),
+ sorted(self.TEST_CASE['expected_list']))
+
+ def test_empty_option_args_parsing(self):
+ test_pipeline = TestPipeline()
+ self.assertListEqual([],
+ test_pipeline.get_full_options_as_args())
+
+ def test_create_test_pipeline_options(self):
+ test_pipeline = TestPipeline(argv=self.TEST_CASE['options'])
+ test_options = PipelineOptions(test_pipeline.get_full_options_as_args())
+ self.assertDictContainsSubset(self.TEST_CASE['expected_dict'],
+ test_options.get_all_options())
+
+ EXTRA_OPT_CASES = [
+ {'options': {'name': 'Mark'},
+ 'expected': ['--name=Mark']},
+ {'options': {'student': True},
+ 'expected': ['--student']},
+ {'options': {'student': False},
+ 'expected': []},
+ {'options': {'name': 'Mark', 'student': True},
+ 'expected': ['--name=Mark', '--student']}
+ ]
+
+ def test_append_extra_options(self):
+ test_pipeline = TestPipeline()
+ for case in self.EXTRA_OPT_CASES:
+ opt_list = test_pipeline.get_full_options_as_args(**case['options'])
+ self.assertListEqual(sorted(opt_list), sorted(case['expected']))
+
+ def test_append_verifier_in_extra_opt(self):
+ extra_opt = {'matcher': SimpleMatcher()}
+ opt_list = TestPipeline().get_full_options_as_args(**extra_opt)
+ _, value = opt_list[0].split('=', 1)
+ matcher = pickler.loads(value)
+ self.assertTrue(isinstance(matcher, BaseMatcher))
+ hc_assert_that(None, matcher)
+
+ def test_get_option(self):
+ name, value = ('job', 'mockJob')
+ test_pipeline = TestPipeline()
+ test_pipeline.options_list = ['--%s=%s' % (name, value)]
+ self.assertEqual(test_pipeline.get_option(name), value)
+
+ def test_skip_IT(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ test_pipeline.run()
+ # Note that this will never be reached since it should be skipped above.
+ self.fail()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
new file mode 100644
index 0000000..7ae27b7
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Provides TestStream for verifying streaming runner semantics."""
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+from apache_beam import coders
+from apache_beam import pvalue
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class Event(object):
+ """Test stream event to be emitted during execution of a TestStream."""
+
+ __metaclass__ = ABCMeta
+
+ def __cmp__(self, other):
+ if type(self) is not type(other):
+ return cmp(type(self), type(other))
+ return self._typed_cmp(other)
+
+ @abstractmethod
+ def _typed_cmp(self, other):
+ raise NotImplementedError
+
+
+class ElementEvent(Event):
+ """Element-producing test stream event."""
+
+ def __init__(self, timestamped_values):
+ self.timestamped_values = timestamped_values
+
+ def _typed_cmp(self, other):
+ return cmp(self.timestamped_values, other.timestamped_values)
+
+
+class WatermarkEvent(Event):
+ """Watermark-advancing test stream event."""
+
+ def __init__(self, new_watermark):
+ self.new_watermark = timestamp.Timestamp.of(new_watermark)
+
+ def _typed_cmp(self, other):
+ return cmp(self.new_watermark, other.new_watermark)
+
+
+class ProcessingTimeEvent(Event):
+ """Processing time-advancing test stream event."""
+
+ def __init__(self, advance_by):
+ self.advance_by = timestamp.Duration.of(advance_by)
+
+ def _typed_cmp(self, other):
+ return cmp(self.advance_by, other.advance_by)
+
+
+class TestStream(PTransform):
+ """Test stream that generates events on an unbounded PCollection of elements.
+
+ Each event emits elements, advances the watermark or advances the processing
+ time. After all of the specified elements are emitted, ceases to produce
+ output.
+ """
+
+ def __init__(self, coder=coders.FastPrimitivesCoder):
+ assert coder is not None
+ self.coder = coder
+ self.current_watermark = timestamp.MIN_TIMESTAMP
+ self.events = []
+
+ def expand(self, pbegin):
+ assert isinstance(pbegin, pvalue.PBegin)
+ self.pipeline = pbegin.pipeline
+ return pvalue.PCollection(self.pipeline)
+
+ def _infer_output_coder(self, input_type=None, input_coder=None):
+ return self.coder
+
+ def _add(self, event):
+ if isinstance(event, ElementEvent):
+ for tv in event.timestamped_values:
+ assert tv.timestamp < timestamp.MAX_TIMESTAMP, (
+ 'Element timestamp must be before timestamp.MAX_TIMESTAMP.')
+ elif isinstance(event, WatermarkEvent):
+ assert event.new_watermark > self.current_watermark, (
+ 'Watermark must strictly-monotonically advance.')
+ self.current_watermark = event.new_watermark
+ elif isinstance(event, ProcessingTimeEvent):
+ assert event.advance_by > 0, (
+ 'Must advance processing time by positive amount.')
+ else:
+ raise ValueError('Unknown event: %s' % event)
+ self.events.append(event)
+
+ def add_elements(self, elements):
+ """Add elements to the TestStream.
+
+ Elements added to the TestStream will be produced during pipeline execution.
+ These elements can be TimestampedValue, WindowedValue or raw unwrapped
+ elements that are serializable using the TestStream's specified Coder. When
+ a TimestampedValue or a WindowedValue element is used, the timestamp of the
+ TimestampedValue or WindowedValue will be the timestamp of the produced
+ element; otherwise, the current watermark timestamp will be used for that
+ element. The windows of a given WindowedValue are ignored by the
+ TestStream.
+ """
+ timestamped_values = []
+ for element in elements:
+ if isinstance(element, TimestampedValue):
+ timestamped_values.append(element)
+ elif isinstance(element, WindowedValue):
+ # Drop windows for elements in test stream.
+ timestamped_values.append(
+ TimestampedValue(element.value, element.timestamp))
+ else:
+ # Add elements with timestamp equal to current watermark.
+ timestamped_values.append(
+ TimestampedValue(element, self.current_watermark))
+ self._add(ElementEvent(timestamped_values))
+ return self
+
+ def advance_watermark_to(self, new_watermark):
+ """Advance the watermark to a given Unix timestamp.
+
+ The Unix timestamp value used must be later than the previous watermark
+ value and should be given as an int, float or utils.timestamp.Timestamp
+ object.
+ """
+ self._add(WatermarkEvent(new_watermark))
+ return self
+
+ def advance_watermark_to_infinity(self):
+ """Advance the watermark to the end of time."""
+ self.advance_watermark_to(timestamp.MAX_TIMESTAMP)
+ return self
+
+ def advance_processing_time(self, advance_by):
+ """Advance the current processing time by a given duration in seconds.
+
+ The duration must be a positive second duration and should be given as an
+ int, float or utils.timestamp.Duration object.
+ """
+ self._add(ProcessingTimeEvent(advance_by))
+ return self
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
new file mode 100644
index 0000000..e32dda2
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test_stream module."""
+
+import unittest
+
+from apache_beam.testing.test_stream import ElementEvent
+from apache_beam.testing.test_stream import ProcessingTimeEvent
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.test_stream import WatermarkEvent
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class TestStreamTest(unittest.TestCase):
+
+ def test_basic_test_stream(self):
+ test_stream = (TestStream()
+ .advance_watermark_to(0)
+ .add_elements([
+ 'a',
+ WindowedValue('b', 3, []),
+ TimestampedValue('c', 6)])
+ .advance_processing_time(10)
+ .advance_watermark_to(8)
+ .add_elements(['d'])
+ .advance_watermark_to_infinity())
+ self.assertEqual(
+ test_stream.events,
+ [
+ WatermarkEvent(0),
+ ElementEvent([
+ TimestampedValue('a', 0),
+ TimestampedValue('b', 3),
+ TimestampedValue('c', 6),
+ ]),
+ ProcessingTimeEvent(10),
+ WatermarkEvent(8),
+ ElementEvent([
+ TimestampedValue('d', 8),
+ ]),
+ WatermarkEvent(timestamp.MAX_TIMESTAMP),
+ ]
+ )
+
+ def test_test_stream_errors(self):
+ with self.assertRaises(AssertionError, msg=(
+ 'Watermark must strictly-monotonically advance.')):
+ _ = (TestStream()
+ .advance_watermark_to(5)
+ .advance_watermark_to(4))
+
+ with self.assertRaises(AssertionError, msg=(
+ 'Must advance processing time by positive amount.')):
+ _ = (TestStream()
+ .advance_processing_time(-1))
+
+ with self.assertRaises(AssertionError, msg=(
+ 'Element timestamp must be before timestamp.MAX_TIMESTAMP.')):
+ _ = (TestStream()
+ .add_elements([
+ TimestampedValue('a', timestamp.MAX_TIMESTAMP)
+ ]))
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/testing/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
new file mode 100644
index 0000000..666207e
--- /dev/null
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility methods for testing"""
+
+import hashlib
+import imp
+from mock import Mock, patch
+
+from apache_beam.utils import retry
+
+DEFAULT_HASHING_ALG = 'sha1'
+
+
+def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG):
+ """Compute a hash value from a list of string."""
+ content.sort()
+ m = hashlib.new(hashing_alg)
+ for elem in content:
+ m.update(str(elem))
+ return m.hexdigest()
+
+
+def patch_retry(testcase, module):
+ """A function to patch retry module to use mock clock and logger.
+
+ Clock and logger that defined in retry decorator will be replaced in test
+ in order to skip sleep phase when retry happens.
+
+ Args:
+ testcase: An instance of unittest.TestCase that calls this function to
+ patch retry module.
+ module: The module that uses retry and need to be replaced with mock
+ clock and logger in test.
+ """
+ real_retry_with_exponential_backoff = retry.with_exponential_backoff
+
+ def patched_retry_with_exponential_backoff(num_retries, retry_filter):
+ """A patch for retry decorator to use a mock dummy clock and logger."""
+ return real_retry_with_exponential_backoff(
+ num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
+ clock=Mock())
+
+ patch.object(retry, 'with_exponential_backoff',
+ side_effect=patched_retry_with_exponential_backoff).start()
+
+ # Reload module after patching.
+ imp.reload(module)
+
+ def remove_patches():
+ patch.stopall()
+ # Reload module again after removing patch.
+ imp.reload(module)
+
+ testcase.addCleanup(remove_patches)
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/__init__.py b/sdks/python/apache_beam/tests/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- a/sdks/python/apache_beam/tests/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
http://git-wip-us.apache.org/repos/asf/beam/blob/2ecebd22/sdks/python/apache_beam/tests/data/privatekey.p12
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/data/privatekey.p12 b/sdks/python/apache_beam/tests/data/privatekey.p12
deleted file mode 100644
index c369ecb..0000000
Binary files a/sdks/python/apache_beam/tests/data/privatekey.p12 and /dev/null differ