You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/12 00:12:11 UTC
[17/19] beam git commit: Move assert_that, equal_to,
is_empty to apache_beam.testing.util
Move assert_that, equal_to, is_empty to apache_beam.testing.util
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2070f118
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2070f118
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2070f118
Branch: refs/heads/release-2.0.0
Commit: 2070f1182d49e3b7b3e9ed8a35173cb165fa5bfb
Parents: d0da682
Author: Charles Chen <cc...@google.com>
Authored: Thu May 11 15:07:30 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete_test.py | 4 +-
.../examples/complete/estimate_pi_test.py | 4 +-
.../complete/game/hourly_team_score_test.py | 4 +-
.../examples/complete/game/user_score_test.py | 4 +-
.../apache_beam/examples/complete/tfidf_test.py | 4 +-
.../complete/top_wikipedia_sessions_test.py | 4 +-
.../cookbook/bigquery_side_input_test.py | 4 +-
.../cookbook/bigquery_tornadoes_test.py | 6 +-
.../examples/cookbook/coders_test.py | 4 +-
.../examples/cookbook/combiners_test.py | 6 +-
.../examples/cookbook/custom_ptransform_test.py | 4 +-
.../examples/cookbook/filters_test.py | 12 ++-
.../examples/cookbook/mergecontacts.py | 14 +--
.../apache_beam/examples/snippets/snippets.py | 17 +--
.../examples/snippets/snippets_test.py | 30 +++---
.../apache_beam/examples/wordcount_debugging.py | 6 +-
sdks/python/apache_beam/io/avroio_test.py | 4 +-
.../python/apache_beam/io/concat_source_test.py | 4 +-
.../apache_beam/io/filebasedsource_test.py | 4 +-
sdks/python/apache_beam/io/sources_test.py | 4 +-
sdks/python/apache_beam/io/textio_test.py | 5 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 24 +++--
sdks/python/apache_beam/pipeline_test.py | 4 +-
.../portability/maptask_executor_runner_test.py | 6 +-
sdks/python/apache_beam/runners/runner_test.py | 4 +-
sdks/python/apache_beam/testing/util.py | 107 +++++++++++++++++++
sdks/python/apache_beam/testing/util_test.py | 50 +++++++++
.../apache_beam/transforms/combiners_test.py | 2 +-
.../apache_beam/transforms/create_test.py | 3 +-
.../apache_beam/transforms/ptransform_test.py | 2 +-
.../apache_beam/transforms/sideinputs_test.py | 2 +-
.../apache_beam/transforms/trigger_test.py | 2 +-
sdks/python/apache_beam/transforms/util.py | 79 --------------
sdks/python/apache_beam/transforms/util_test.py | 50 ---------
.../apache_beam/transforms/window_test.py | 2 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 2 +-
37 files changed, 271 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 438633a..378d222 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -22,8 +22,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import autocomplete
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class AutocompleteTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 12d8379..fd51309 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -22,8 +22,8 @@ import unittest
from apache_beam.examples.complete import estimate_pi
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
def in_between(lower, upper):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
index bd0abca..9c30127 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete.game import hourly_team_score
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class HourlyTeamScoreTest(unittest.TestCase):
@@ -44,7 +46,7 @@ class HourlyTeamScoreTest(unittest.TestCase):
start_min='2015-11-16-15-20',
stop_min='2015-11-16-17-20',
window_duration=60))
- beam.assert_that(result, beam.equal_to([
+ assert_that(result, equal_to([
('team1', 18), ('team2', 2), ('team3', 13)]))
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/user_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
index 2db53bd..59903d9 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete.game import user_score
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class UserScoreTest(unittest.TestCase):
@@ -40,7 +42,7 @@ class UserScoreTest(unittest.TestCase):
with TestPipeline() as p:
result = (
p | beam.Create(UserScoreTest.SAMPLE_DATA) | user_score.UserScore())
- beam.assert_that(result, beam.equal_to([
+ assert_that(result, equal_to([
('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8),
('user4_team3', 5)]))
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 0e30254..f177dfc 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -26,6 +26,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import tfidf
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
EXPECTED_RESULTS = set([
@@ -57,7 +59,7 @@ class TfIdfTest(unittest.TestCase):
uri_to_line
| tfidf.TfIdf()
| beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
- beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
+ assert_that(result, equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values. To actually
# trigger the check the pipeline must be run.
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 4850c04..5fb6276 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -24,6 +24,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import top_wikipedia_sessions
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class ComputeTopSessionsTest(unittest.TestCase):
@@ -54,7 +56,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
edits = p | beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
- beam.assert_that(result, beam.equal_to(self.EXPECTED))
+ assert_that(result, equal_to(self.EXPECTED))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 1ca25c9..b11dc47 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_side_input
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class BigQuerySideInputTest(unittest.TestCase):
@@ -42,7 +44,7 @@ class BigQuerySideInputTest(unittest.TestCase):
words_pcoll, ignore_corpus_pcoll,
ignore_word_pcoll)
- beam.assert_that(groups, beam.equal_to(
+ assert_that(groups, equal_to(
[('A', 'corpus2', 'word2'),
('B', 'corpus2', 'word2'),
('C', 'corpus2', 'word2')]))
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index ca7ca9e..c926df8 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_tornadoes
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class BigQueryTornadoesTest(unittest.TestCase):
@@ -35,8 +37,8 @@ class BigQueryTornadoesTest(unittest.TestCase):
{'month': 1, 'day': 3, 'tornado': True},
{'month': 2, 'day': 1, 'tornado': True}]))
results = bigquery_tornadoes.count_tornadoes(rows)
- beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2},
- {'month': 2, 'tornado_count': 1}]))
+ assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
+ {'month': 2, 'tornado_count': 1}]))
p.run().wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 35cf252..f71dad8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -23,8 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import coders
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class CodersTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 45c779f..ee1fb77 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -28,6 +28,8 @@ import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class CombinersTest(unittest.TestCase):
@@ -49,7 +51,7 @@ class CombinersTest(unittest.TestCase):
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(sum))
- beam.assert_that(result, beam.equal_to([('a', 6), ('b', 30), ('c', 100)]))
+ assert_that(result, equal_to([('a', 6), ('b', 30), ('c', 100)]))
result.pipeline.run()
def test_combine_per_key_with_custom_callable(self):
@@ -65,7 +67,7 @@ class CombinersTest(unittest.TestCase):
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(multiply))
- beam.assert_that(result, beam.equal_to([('a', 6), ('b', 200), ('c', 100)]))
+ assert_that(result, equal_to([('a', 6), ('b', 200), ('c', 100)]))
result.pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 2d35d8d..c7c6dba 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -23,8 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import custom_ptransform
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class CustomCountTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 44a352f..fd49f93 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import filters
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class FiltersTest(unittest.TestCase):
@@ -45,22 +47,22 @@ class FiltersTest(unittest.TestCase):
def test_basic(self):
"""Test that the correct result is returned for a simple dataset."""
results = self._get_result_for_month(1)
- beam.assert_that(
+ assert_that(
results,
- beam.equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
- {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
+ equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
+ {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
results.pipeline.run()
def test_basic_empty(self):
"""Test that the correct empty result is returned for a simple dataset."""
results = self._get_result_for_month(3)
- beam.assert_that(results, beam.equal_to([]))
+ assert_that(results, equal_to([]))
results.pipeline.run()
def test_basic_empty_missing(self):
"""Test that the correct empty result is returned for a missing month."""
results = self._get_result_for_month(4)
- beam.assert_that(results, beam.equal_to([]))
+ assert_that(results, equal_to([]))
results.pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 5aaba10..4f53c61 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -40,6 +40,8 @@ from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
def run(argv=None, assert_results=None):
@@ -118,12 +120,12 @@ def run(argv=None, assert_results=None):
# TODO(silviuc): Move the assert_results logic to the unit test.
if assert_results is not None:
expected_luddites, expected_writers, expected_nomads = assert_results
- beam.assert_that(num_luddites, beam.equal_to([expected_luddites]),
- label='assert:luddites')
- beam.assert_that(num_writers, beam.equal_to([expected_writers]),
- label='assert:writers')
- beam.assert_that(num_nomads, beam.equal_to([expected_nomads]),
- label='assert:nomads')
+ assert_that(num_luddites, equal_to([expected_luddites]),
+ label='assert:luddites')
+ assert_that(num_writers, equal_to([expected_writers]),
+ label='assert:writers')
+ assert_that(num_nomads, equal_to([expected_nomads]),
+ label='assert:nomads')
# Execute pipeline.
return p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 1bdb9a3..7259572 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -33,6 +33,8 @@ string. The tags can contain only letters, digits and _.
import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
# Quiet some pylint warnings that happen because of the somewhat special
# format for the code snippets.
@@ -566,8 +568,9 @@ def examples_wordcount_debugging(renames):
| 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
# [START example_wordcount_debugging_assert]
- beam.assert_that(
- filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+ beam.testing.util.assert_that(
+ filtered_words, beam.testing.util.equal_to(
+ [('Flourish', 3), ('stomach', 1)]))
# [END example_wordcount_debugging_assert]
output = (filtered_words
@@ -661,8 +664,8 @@ def model_custom_source(count):
# [END model_custom_source_use_new_source]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
- beam.assert_that(
- lines, beam.equal_to(
+ assert_that(
+ lines, equal_to(
['line ' + str(number) for number in range(0, count)]))
p.run().wait_until_finish()
@@ -691,8 +694,8 @@ def model_custom_source(count):
# [END model_custom_source_use_ptransform]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
- beam.assert_that(
- lines, beam.equal_to(
+ assert_that(
+ lines, equal_to(
['line ' + str(number) for number in range(0, count)]))
# Don't test runner api due to pickling errors.
@@ -872,7 +875,7 @@ def model_textio_compressed(renames, expected):
compression_type=beam.io.filesystem.CompressionTypes.GZIP)
# [END model_textio_write_compressed]
- beam.assert_that(lines, beam.equal_to(expected))
+ assert_that(lines, equal_to(expected))
p.visit(SnippetUtils.RenameFiles(renames))
p.run().wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 37cd470..f7b51a7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -30,10 +30,10 @@ from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.coders.coders import ToStringCoder
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.examples.snippets import snippets
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.utils.windowed_value import WindowedValue
# pylint: disable=expression-not-assigned
@@ -158,11 +158,11 @@ class ParDoTest(unittest.TestCase):
avg_word_len))
# [END model_pardo_side_input]
- beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc']))
- beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']),
- label='larger_than_average')
- beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']),
- label='small_but_not_trivial')
+ assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
+ assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
+ label='larger_than_average')
+ assert_that(small_but_nontrivial, equal_to(['bb']),
+ label='small_but_not_trivial')
p.run()
def test_pardo_side_input_dofn(self):
@@ -816,7 +816,7 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed, beam.equal_to([110, 215, 120]))
+ assert_that(unkeyed, equal_to([110, 215, 120]))
p.run()
def test_setting_sliding_windows(self):
@@ -834,8 +834,8 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed,
- beam.equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
+ assert_that(unkeyed,
+ equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
p.run()
def test_setting_session_windows(self):
@@ -853,8 +853,8 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed,
- beam.equal_to([29, 27]))
+ assert_that(unkeyed,
+ equal_to([29, 27]))
p.run()
def test_setting_global_window(self):
@@ -872,7 +872,7 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed, beam.equal_to([56]))
+ assert_that(unkeyed, equal_to([56]))
p.run()
def test_setting_timestamp(self):
@@ -903,7 +903,7 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed, beam.equal_to([42, 187]))
+ assert_that(unkeyed, equal_to([42, 187]))
p.run()
@@ -921,7 +921,7 @@ class PTransformTest(unittest.TestCase):
p = TestPipeline()
lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
- beam.assert_that(lengths, beam.equal_to([1, 2, 3]))
+ assert_that(lengths, equal_to([1, 2, 3]))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 98acde4..ca9f7b6 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -51,6 +51,8 @@ from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class FilterTextFn(beam.DoFn):
@@ -133,8 +135,8 @@ def run(argv=None):
# of the Pipeline implies that the expectations were met. Learn more at
# https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
# test your pipeline.
- beam.assert_that(
- filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+ assert_that(
+ filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
# Format the counts into a PCollection of strings and write the output using a
# "Write" transform that has side effects.
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 4a21839..6dcf121 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -27,10 +27,10 @@ from apache_beam.io import avroio
from apache_beam.io import filebasedsource
from apache_beam.io import source_test_utils
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
# Importing following private class for testing purposes.
from apache_beam.io.avroio import _AvroSource as AvroSource
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index a02f9ad..4a8f519 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -27,8 +27,8 @@ from apache_beam.io import range_trackers
from apache_beam.io import source_test_utils
from apache_beam.io.concat_source import ConcatSource
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class RangeSource(iobase.BoundedSource):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index e17a004..afb340d 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -41,10 +41,10 @@ from apache_beam.io.filebasedsource import FileBasedSource
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
class LineSource(FileBasedSource):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index c0b8ad6..10d401b 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -28,8 +28,8 @@ from apache_beam import coders
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class LineSource(iobase.BoundedSource):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index d00afef..9a4ec47 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -44,9 +44,8 @@ from apache_beam.io.filebasedsource_test import write_pattern
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.testing.test_pipeline import TestPipeline
-
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
# TODO: Refactor code so all io tests are using same library
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index b7e370d..3c70ade 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -36,6 +36,8 @@ from apache_beam.io.tfrecordio import _TFRecordUtil
from apache_beam.io.tfrecordio import ReadFromTFRecord
from apache_beam.io.tfrecordio import WriteToTFRecord
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
import crcmod
@@ -254,7 +256,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.AUTO,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo']))
+ assert_that(result, equal_to(['foo']))
def test_process_multiple(self):
path = os.path.join(self._new_tempdir(), 'result')
@@ -267,7 +269,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.AUTO,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
def test_process_gzip(self):
path = os.path.join(self._new_tempdir(), 'result')
@@ -280,7 +282,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.GZIP,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
def test_process_auto(self):
path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -293,7 +295,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.AUTO,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
class TestReadFromTFRecordSource(TestTFRecordSource):
@@ -305,7 +307,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
result = (p
| ReadFromTFRecord(
path, compression_type=CompressionTypes.GZIP))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
def test_process_gzip_auto(self):
path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -314,7 +316,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
result = (p
| ReadFromTFRecord(
path, compression_type=CompressionTypes.AUTO))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
@@ -337,7 +339,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
def test_end2end_auto_compression(self):
file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -351,7 +353,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
def test_end2end_auto_compression_unsharded(self):
file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -365,7 +367,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(file_path_prefix + '.gz')
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
@unittest.skipIf(tf is None, 'tensorflow not installed.')
def test_end2end_example_proto(self):
@@ -385,7 +387,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
actual_data = (p | ReadFromTFRecord(
file_path_prefix + '-*',
coder=beam.coders.ProtoCoder(example.__class__)))
- beam.assert_that(actual_data, beam.equal_to([example]))
+ assert_that(actual_data, equal_to([example]))
def test_end2end_read_write_read(self):
path = os.path.join(self._new_tempdir(), 'result')
@@ -400,7 +402,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(path+'-*', validate=True)
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 8aa8a8a..e0775d1 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -33,6 +33,8 @@ from apache_beam.pipeline import PipelineVisitor
from apache_beam.pvalue import AsSingleton
from apache_beam.runners.dataflow.native_io.iobase import NativeSource
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.transforms import CombineGlobally
from apache_beam.transforms import Create
from apache_beam.transforms import FlatMap
@@ -41,8 +43,6 @@ from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import MIN_TIMESTAMP
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 062e6f9..b7ba15a 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -28,9 +28,9 @@ from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
from apache_beam.pvalue import AsList
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import equal_to
from apache_beam.transforms.window import TimestampedValue
from apache_beam.runners.portability import maptask_executor_runner
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index c61c49f..fa80b1c 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -36,8 +36,8 @@ from apache_beam.metrics.metricbase import MetricName
from apache_beam.pipeline import Pipeline
from apache_beam.runners import DirectRunner
from apache_beam.runners import create_runner
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.options.pipeline_options import PipelineOptions
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
new file mode 100644
index 0000000..60a6b21
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for testing Beam pipelines."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+from apache_beam.transforms import window
+from apache_beam.transforms.core import Create
+from apache_beam.transforms.core import Map
+from apache_beam.transforms.core import WindowInto
+from apache_beam.transforms.util import CoGroupByKey
+from apache_beam.transforms.ptransform import PTransform
+
+
+__all__ = [
+ 'assert_that',
+ 'equal_to',
+ 'is_empty',
+ ]
+
+
+class BeamAssertException(Exception):
+ """Exception raised by matcher classes used by assert_that transform."""
+
+ pass
+
+
+# Note that equal_to always sorts the expected and actual since what we
+# compare are PCollections for which there is no guaranteed order.
+# However the sorting does not go beyond top level therefore [1,2] and [2,1]
+# are considered equal and [[1,2]] and [[2,1]] are not.
+# TODO(silviuc): Add contains_in_any_order-style matchers.
+def equal_to(expected):
+ expected = list(expected)
+
+ def _equal(actual):
+ sorted_expected = sorted(expected)
+ sorted_actual = sorted(actual)
+ if sorted_expected != sorted_actual:
+ raise BeamAssertException(
+ 'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
+ return _equal
+
+
+def is_empty():
+ def _empty(actual):
+ actual = list(actual)
+ if actual:
+ raise BeamAssertException(
+ 'Failed assert: [] == %r' % actual)
+ return _empty
+
+
+def assert_that(actual, matcher, label='assert_that'):
+ """A PTransform that checks a PCollection has an expected value.
+
+ Note that assert_that should be used only for testing pipelines since the
+ check relies on materializing the entire PCollection being checked.
+
+ Args:
+ actual: A PCollection.
+ matcher: A matcher function taking as argument the actual value of a
+ materialized PCollection. The matcher validates this actual value against
+ expectations and raises BeamAssertException if they are not met.
+ label: Optional string label. This is needed in case several assert_that
+ transforms are introduced in the same pipeline.
+
+ Returns:
+ Ignored.
+ """
+ assert isinstance(actual, pvalue.PCollection)
+
+ class AssertThat(PTransform):
+
+ def expand(self, pcoll):
+ # We must have at least a single element to ensure the matcher
+ # code gets run even if the input pcollection is empty.
+ keyed_singleton = pcoll.pipeline | Create([(None, None)])
+ keyed_actual = (
+ pcoll
+ | WindowInto(window.GlobalWindows())
+ | "ToVoidKey" >> Map(lambda v: (None, v)))
+ _ = ((keyed_singleton, keyed_actual)
+ | "Group" >> CoGroupByKey()
+ | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
+ | "Match" >> Map(matcher))
+
+ def default_label(self):
+ return label
+
+ actual | AssertThat() # pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util_test.py b/sdks/python/apache_beam/testing/util_test.py
new file mode 100644
index 0000000..1acebb6
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util_test.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for testing utilities."""
+
+import unittest
+
+from apache_beam import Create
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to, is_empty
+
+
+class UtilTest(unittest.TestCase):
+
+ def test_assert_that_passes(self):
+ with TestPipeline() as p:
+ assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
+
+ def test_assert_that_fails(self):
+ with self.assertRaises(Exception):
+ with TestPipeline() as p:
+ assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
+
+ def test_assert_that_fails_on_empty_input(self):
+ with self.assertRaises(Exception):
+ with TestPipeline() as p:
+ assert_that(p | Create([]), equal_to([1, 2, 3]))
+
+ def test_assert_that_fails_on_empty_expected(self):
+ with self.assertRaises(Exception):
+ with TestPipeline() as p:
+ assert_that(p | Create([1, 2, 3]), is_empty())
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 1822c19..946a60a 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -24,13 +24,13 @@ import hamcrest as hc
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
import apache_beam.transforms.combiners as combine
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms.core import CombineGlobally
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import Map
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
class CombineTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index 9ede4c7..55ad7f3 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -20,9 +20,10 @@ import unittest
from apache_beam.io import source_test_utils
-from apache_beam import Create, assert_that, equal_to
+from apache_beam import Create
from apache_beam.coders import FastPrimitivesCoder
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
class CreateTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3320d79..f790660 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -33,12 +33,12 @@ from apache_beam.io.iobase import Read
from apache_beam.options.pipeline_options import TypeOptions
import apache_beam.pvalue as pvalue
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import window
from apache_beam.transforms.core import GroupByKeyOnly
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.display import DisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
import apache_beam.typehints as typehints
from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 0bc9107..6500681 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -24,8 +24,8 @@ from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import window
-from apache_beam.transforms.util import assert_that, equal_to
class SideInputsTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 2574c4b..a27f47f 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -27,6 +27,7 @@ import yaml
import apache_beam as beam
from apache_beam.runners import pipeline_context
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import trigger
from apache_beam.transforms.core import Windowing
from apache_beam.transforms.trigger import AccumulationMode
@@ -40,7 +41,6 @@ from apache_beam.transforms.trigger import GeneralTriggerDriver
from apache_beam.transforms.trigger import InMemoryUnmergedState
from apache_beam.transforms.trigger import Repeatedly
from apache_beam.transforms.trigger import TriggerFn
-from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import MIN_TIMESTAMP
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index a6ecf0a..a7484ac 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -20,14 +20,10 @@
from __future__ import absolute_import
-from apache_beam import pvalue
-from apache_beam.transforms import window
from apache_beam.transforms.core import CombinePerKey
-from apache_beam.transforms.core import Create
from apache_beam.transforms.core import Flatten
from apache_beam.transforms.core import GroupByKey
from apache_beam.transforms.core import Map
-from apache_beam.transforms.core import WindowInto
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.ptransform import ptransform_fn
@@ -38,9 +34,6 @@ __all__ = [
'KvSwap',
'RemoveDuplicates',
'Values',
- 'assert_that',
- 'equal_to',
- 'is_empty',
]
@@ -169,75 +162,3 @@ def RemoveDuplicates(pcoll): # pylint: disable=invalid-name
| 'ToPairs' >> Map(lambda v: (v, None))
| 'Group' >> CombinePerKey(lambda vs: None)
| 'RemoveDuplicates' >> Keys())
-
-
-class BeamAssertException(Exception):
- """Exception raised by matcher classes used by assert_that transform."""
-
- pass
-
-
-# Note that equal_to always sorts the expected and actual since what we
-# compare are PCollections for which there is no guaranteed order.
-# However the sorting does not go beyond top level therefore [1,2] and [2,1]
-# are considered equal and [[1,2]] and [[2,1]] are not.
-# TODO(silviuc): Add contains_in_any_order-style matchers.
-def equal_to(expected):
- expected = list(expected)
-
- def _equal(actual):
- sorted_expected = sorted(expected)
- sorted_actual = sorted(actual)
- if sorted_expected != sorted_actual:
- raise BeamAssertException(
- 'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
- return _equal
-
-
-def is_empty():
- def _empty(actual):
- actual = list(actual)
- if actual:
- raise BeamAssertException(
- 'Failed assert: [] == %r' % actual)
- return _empty
-
-
-def assert_that(actual, matcher, label='assert_that'):
- """A PTransform that checks a PCollection has an expected value.
-
- Note that assert_that should be used only for testing pipelines since the
- check relies on materializing the entire PCollection being checked.
-
- Args:
- actual: A PCollection.
- matcher: A matcher function taking as argument the actual value of a
- materialized PCollection. The matcher validates this actual value against
- expectations and raises BeamAssertException if they are not met.
- label: Optional string label. This is needed in case several assert_that
- transforms are introduced in the same pipeline.
-
- Returns:
- Ignored.
- """
- assert isinstance(actual, pvalue.PCollection)
-
- class AssertThat(PTransform):
-
- def expand(self, pcoll):
- # We must have at least a single element to ensure the matcher
- # code gets run even if the input pcollection is empty.
- keyed_singleton = pcoll.pipeline | Create([(None, None)])
- keyed_actual = (
- pcoll
- | WindowInto(window.GlobalWindows())
- | "ToVoidKey" >> Map(lambda v: (None, v)))
- _ = ((keyed_singleton, keyed_actual)
- | "Group" >> CoGroupByKey()
- | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
- | "Match" >> Map(matcher))
-
- def default_label(self):
- return label
-
- actual | AssertThat() # pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
deleted file mode 100644
index 7fdef70..0000000
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the util transforms."""
-
-import unittest
-
-from apache_beam import Create
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to, is_empty
-
-
-class UtilTest(unittest.TestCase):
-
- def test_assert_that_passes(self):
- with TestPipeline() as p:
- assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
-
- def test_assert_that_fails(self):
- with self.assertRaises(Exception):
- with TestPipeline() as p:
- assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
-
- def test_assert_that_fails_on_empty_input(self):
- with self.assertRaises(Exception):
- with TestPipeline() as p:
- assert_that(p | Create([]), equal_to([1, 2, 3]))
-
- def test_assert_that_fails_on_empty_expected(self):
- with self.assertRaises(Exception):
- with TestPipeline() as p:
- assert_that(p | Create([1, 2, 3]), is_empty())
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index a7797dd..fd1bb9d 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -21,6 +21,7 @@ import unittest
from apache_beam.runners import pipeline_context
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import combiners
from apache_beam.transforms import core
@@ -31,7 +32,6 @@ from apache_beam.transforms import WindowInto
from apache_beam.transforms.core import Windowing
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 27e7caa..e31b9cc 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -23,8 +23,8 @@ import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, is_empty
from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, is_empty
class _TestSink(iobase.Sink):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 3494cfe..589dc0e 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -25,7 +25,7 @@ from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.options.pipeline_options import OptionsContext
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.typehints import WithTypeHints
# These test often construct a pipeline as value | PTransform to test side