You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 18:11:35 UTC
[3/6] beam git commit: Changed tests in examples/ and io/ to use
TestPipeline.
Changed tests in examples/ and io/ to use TestPipeline.
Removed wait_until_finish except for a few exceptions:
- tfidf: as an example usage.
- some examples in cookbook - they run examples directly
and, did not want to update the examples to use TestPipeline.
- some snippets - if the pipeline creations is part of the snippet
and it was not easy to override.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/703c1bc1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/703c1bc1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/703c1bc1
Branch: refs/heads/python-sdk
Commit: 703c1bc18ac28aa1aa75a9359c1bcc4fbdd28f35
Parents: 2e49f51
Author: Ahmet Altay <al...@google.com>
Authored: Sat Jan 14 23:49:25 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:37 2017 -0800
----------------------------------------------------------------------
.../examples/complete/autocomplete_test.py | 3 +-
.../examples/complete/estimate_pi_test.py | 4 +-
.../complete/juliaset/juliaset/juliaset.py | 2 +-
.../complete/juliaset/juliaset/juliaset_test.py | 2 +-
.../examples/complete/juliaset/juliaset_main.py | 2 +-
.../apache_beam/examples/complete/tfidf.py | 3 +-
.../apache_beam/examples/complete/tfidf_test.py | 3 +-
.../examples/complete/top_wikipedia_sessions.py | 2 +-
.../complete/top_wikipedia_sessions_test.py | 3 +-
.../cookbook/bigquery_side_input_test.py | 3 +-
.../cookbook/bigquery_tornadoes_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 2 +-
.../examples/cookbook/bigshuffle_test.py | 2 +-
.../examples/cookbook/coders_test.py | 3 +-
.../examples/cookbook/combiners_test.py | 5 +-
.../examples/cookbook/custom_ptransform.py | 4 +-
.../examples/cookbook/custom_ptransform_test.py | 3 +-
.../examples/cookbook/datastore_wordcount.py | 7 ++-
.../examples/cookbook/filters_test.py | 3 +-
.../examples/cookbook/group_with_coder.py | 2 +-
.../examples/cookbook/group_with_coder_test.py | 4 +-
.../examples/cookbook/mergecontacts.py | 2 +-
.../examples/cookbook/mergecontacts_test.py | 3 +-
.../examples/cookbook/multiple_output_pardo.py | 4 +-
.../cookbook/multiple_output_pardo_test.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 65 ++++++++++----------
.../examples/snippets/snippets_test.py | 12 ++--
.../apache_beam/examples/streaming_wordcap.py | 2 +-
.../apache_beam/examples/streaming_wordcount.py | 2 +-
sdks/python/apache_beam/examples/wordcount.py | 1 +
.../apache_beam/examples/wordcount_debugging.py | 4 +-
.../apache_beam/examples/wordcount_minimal.py | 4 +-
.../python/apache_beam/io/concat_source_test.py | 3 +-
.../apache_beam/io/filebasedsource_test.py | 19 +++---
sdks/python/apache_beam/io/fileio_test.py | 11 ++--
sdks/python/apache_beam/io/sources_test.py | 3 +-
sdks/python/apache_beam/io/textio_test.py | 26 ++++----
sdks/python/tox.ini | 1 +
38 files changed, 123 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 5ed4fb5..edf95f0 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,6 +21,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import autocomplete
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -30,7 +31,7 @@ class AutocompleteTest(unittest.TestCase):
WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
def test_top_prefixes(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
words = p | beam.Create(self.WORDS)
result = words | autocomplete.TopPerPrefix(5)
# values must be hashable for now
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 0440ecc..10010cb 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -20,8 +20,8 @@
import logging
import unittest
-import apache_beam as beam
from apache_beam.examples.complete import estimate_pi
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import DataflowAssertException
@@ -38,7 +38,7 @@ def in_between(lower, upper):
class EstimatePiTest(unittest.TestCase):
def test_basics(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
# Note: Probabilistically speaking this test can fail with a probability
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 45fc1fb..30883dc 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -113,7 +113,7 @@ def run(argv=None): # pylint: disable=missing-docstring
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
| WriteToText(known_args.coordinate_output))
# pylint: enable=expression-not-assigned
- p.run()
+ return p.run()
# Optionally render the image and save it to a file.
# TODO(silviuc): Add this functionality.
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
index c13e857..c254eb4 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
@@ -52,7 +52,7 @@ class JuliaSetTest(unittest.TestCase):
if image_file_name is not None:
args.append('--image_output=%s' % image_file_name)
- juliaset.run(args)
+ juliaset.run(args).wait_until_finish()
def test_output_file_format(self):
grid_size = 5
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
index d6ba064..0db5431 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
@@ -38,7 +38,7 @@ an example:
python juliaset_main.py \
--job_name juliaset-$USER \
--project YOUR-PROJECT \
- --runner BlockingDataflowRunner \
+ --runner DataflowRunner \
--setup_file ./setup.py \
--staging_location gs://YOUR-BUCKET/juliaset/staging \
--temp_location gs://YOUR-BUCKET/juliaset/temp \
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 59b9d6f..4d6e0d3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -200,7 +200,8 @@ def run(argv=None):
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(known_args.output)
- p.run()
+ # Execute the pipeline and wait until it is completed.
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index deda4cb..404ab44 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -25,6 +25,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import tfidf
+from apache_beam.test_pipeline import TestPipeline
EXPECTED_RESULTS = set([
@@ -47,7 +48,7 @@ class TfIdfTest(unittest.TestCase):
f.write(contents)
def test_tfidf_transform(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
uri_to_line = p | beam.Create(
'create sample',
[('1.txt', 'abc def ghi'),
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 2dea642..4920813 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -31,7 +31,7 @@ pipeline configuration in addition to the above:
--project YOUR_PROJECT_ID
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMPORARY_DIRECTORY
- --runner BlockingDataflowRunner
+ --runner DataflowRunner
The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be
overridden with --input.
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 1d25807..9b9d9b1 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -23,6 +23,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import top_wikipedia_sessions
+from apache_beam.test_pipeline import TestPipeline
class ComputeTopSessionsTest(unittest.TestCase):
@@ -49,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
]
def test_compute_top_sessions(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
edits = p | beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 97c41d6..926f141 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -22,12 +22,13 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_side_input
+from apache_beam.test_pipeline import TestPipeline
class BigQuerySideInputTest(unittest.TestCase):
def test_create_groups(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
corpus_pcoll = p | beam.Create('create_corpus',
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 2cb2c45..0fabe3f 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -36,7 +36,7 @@ class BigQueryTornadoesTest(unittest.TestCase):
results = bigquery_tornadoes.count_tornadoes(rows)
beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2},
{'month': 2, 'tornado_count': 1}]))
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index ceeefd6..b5eacce 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -87,7 +87,7 @@ def run(argv=None):
known_args.checksum_output + '-output')
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
index d73c976..60b6acc 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
@@ -38,7 +38,7 @@ class BigShuffleTest(unittest.TestCase):
bigshuffle.run([
'--input=%s*' % temp_path,
'--output=%s.result' % temp_path,
- '--checksum_output=%s.checksum' % temp_path])
+ '--checksum_output=%s.checksum' % temp_path]).wait_until_finish()
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index ec0848f..4a92abb 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -22,6 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import coders
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -34,7 +35,7 @@ class CodersTest(unittest.TestCase):
{'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
def test_compute_points(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
result = (records
| 'points' >> beam.FlatMap(coders.compute_points)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 18bd3bc..a8ed555 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -27,6 +27,7 @@ import logging
import unittest
import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
class CombinersTest(unittest.TestCase):
@@ -44,7 +45,7 @@ class CombinersTest(unittest.TestCase):
can be used.
"""
result = (
- beam.Pipeline(runner=beam.runners.DirectRunner())
+ TestPipeline()
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(sum))
@@ -60,7 +61,7 @@ class CombinersTest(unittest.TestCase):
return result
result = (
- beam.Pipeline(runner=beam.runners.DirectRunner())
+ TestPipeline()
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(multiply))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index cfbb99d..67d1ff8 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -51,7 +51,7 @@ def run_count1(known_args, options):
(p | beam.io.ReadFromText(known_args.input)
| Count1()
| beam.io.WriteToText(known_args.output))
- p.run()
+ p.run().wait_until_finish()
@beam.ptransform_fn
@@ -70,7 +70,7 @@ def run_count2(known_args, options):
(p | ReadFromText(known_args.input)
| Count2() # pylint: disable=no-value-for-parameter
| WriteToText(known_args.output))
- p.run()
+ p.run().wait_until_finish()
@beam.ptransform_fn
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 91309ae..cd1c04a 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -22,6 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import custom_ptransform
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -39,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
self.run_pipeline(custom_ptransform.Count3(factor), factor=factor)
def run_pipeline(self, count_implementation, factor=1):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
result = words | count_implementation
assert_that(
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 8f68fb4..25abb3e 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -141,7 +141,7 @@ def write_to_datastore(project, user_options, pipeline_options):
| 'write to datastore' >> WriteToDatastore(project))
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ p.run().wait_until_finish()
def make_ancestor_query(kind, namespace, ancestor):
@@ -192,7 +192,10 @@ def read_from_datastore(project, user_options, pipeline_options):
num_shards=user_options.num_shards)
# Actually run the pipeline (all operations above are deferred).
- return p.run()
+ result = p.run()
+ # Wait until completion, main thread would access post-completion job results.
+ result.wait_until_finish()
+ return result
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 81dd30f..28bb1e1 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -22,6 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import filters
+from apache_beam.test_pipeline import TestPipeline
class FiltersTest(unittest.TestCase):
@@ -35,7 +36,7 @@ class FiltersTest(unittest.TestCase):
]
def _get_result_for_month(self, month):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
rows = (p | 'create' >> beam.Create(self.input_data))
results = filters.filter_cold_days(rows, month)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 78540d1..f6f2108 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -114,7 +114,7 @@ def run(argv=sys.argv[1:]):
| beam.CombinePerKey(sum)
| beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
| WriteToText(known_args.output))
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 268ba8d..4e87966 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -50,7 +50,7 @@ class GroupWithCoderTest(unittest.TestCase):
temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
group_with_coder.run([
'--input=%s*' % temp_path,
- '--output=%s.result' % temp_path])
+ '--output=%s.result' % temp_path]).wait_until_finish()
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
@@ -71,7 +71,7 @@ class GroupWithCoderTest(unittest.TestCase):
group_with_coder.run([
'--no_pipeline_type_check',
'--input=%s*' % temp_path,
- '--output=%s.result' % temp_path])
+ '--output=%s.result' % temp_path]).wait_until_finish()
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 2475e02..6906ae4 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -124,7 +124,7 @@ def run(argv=None, assert_results=None):
beam.assert_that(num_nomads, beam.equal_to([expected_nomads]),
label='assert:nomads')
# Execute pipeline.
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
index b3be0dd..09f71d3 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -107,12 +107,13 @@ class MergeContactsTest(unittest.TestCase):
result_prefix = self.create_temp_file('')
- mergecontacts.run([
+ result = mergecontacts.run([
'--input_email=%s' % path_email,
'--input_phone=%s' % path_phone,
'--input_snailmail=%s' % path_snailmail,
'--output_tsv=%s.tsv' % result_prefix,
'--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))
+ result.wait_until_finish()
with open('%s.tsv-00000-of-00001' % result_prefix) as f:
contents = f.read()
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 3acebc6..a877a1d 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -40,7 +40,7 @@ pipeline configuration:
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMP_DIRECTORY
--job_name YOUR_JOB_NAME
- --runner BlockingDataflowRunner
+ --runner DataflowRunner
and an output prefix on GCS:
--output gs://YOUR_OUTPUT_PREFIX
@@ -173,7 +173,7 @@ def run(argv=None):
| 'count words' >> CountWords()
| 'write words' >> WriteToText(known_args.output + '-words'))
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
index 3ddd668..2c9111c 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -52,7 +52,7 @@ class MultipleOutputParDo(unittest.TestCase):
multiple_output_pardo.run([
'--input=%s*' % temp_path,
- '--output=%s' % result_prefix])
+ '--output=%s' % result_prefix]).wait_until_finish()
expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n')))
with open(result_prefix + '-chars-00000-of-00001') as f:
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 2eadc44..0cba1af 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -31,6 +31,7 @@ string. The tags can contain only letters, digits and _.
"""
import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
# Quiet some pylint warnings that happen because of the somewhat special
# format for the code snippets.
@@ -88,6 +89,8 @@ def construct_pipeline(renames):
p = beam.Pipeline(options=PipelineOptions())
# [END pipelines_constructing_creating]
+ p = TestPipeline() # Use TestPipeline for testing.
+
# [START pipelines_constructing_reading]
lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')
# [END pipelines_constructing_reading]
@@ -106,8 +109,9 @@ def construct_pipeline(renames):
p.visit(SnippetUtils.RenameFiles(renames))
# [START pipelines_constructing_running]
- p.run()
+ result = p.run()
# [END pipelines_constructing_running]
+ result
def model_pipelines(argv):
@@ -144,8 +148,9 @@ def model_pipelines(argv):
| beam.combiners.Count.PerKey()
| beam.io.WriteToText(my_options.output))
- p.run()
+ result = p.run()
# [END model_pipelines]
+ result.wait_until_finish()
def model_pcollection(argv):
@@ -175,8 +180,9 @@ def model_pcollection(argv):
'Or to take arms against a sea of troubles, '])
| beam.io.WriteToText(my_options.output))
- p.run()
+ result = p.run()
# [END model_pcollection]
+ result.wait_until_finish()
def pipeline_options_remote(argv):
@@ -206,8 +212,7 @@ def pipeline_options_remote(argv):
options = PipelineOptions(flags=argv)
# For Cloud execution, set the Cloud Platform project, job_name,
- # staging location, temp_location and specify DataflowRunner or
- # BlockingDataflowRunner.
+ # staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
@@ -223,9 +228,7 @@ def pipeline_options_remote(argv):
my_input = my_options.input
my_output = my_options.output
- # Overriding the runner for tests.
- options.view_as(StandardOptions).runner = 'DirectRunner'
- p = Pipeline(options=options)
+ p = TestPipeline() # Use TestPipeline for testing.
lines = p | beam.io.ReadFromText(my_input)
lines | beam.io.WriteToText(my_output)
@@ -265,6 +268,7 @@ def pipeline_options_local(argv):
p = Pipeline(options=options)
# [END pipeline_options_local]
+ p = TestPipeline() # Use TestPipeline for testing.
lines = p | beam.io.ReadFromText(my_input)
lines | beam.io.WriteToText(my_output)
p.run()
@@ -288,7 +292,7 @@ def pipeline_options_command_line(argv):
lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
# [END pipeline_options_command_line]
- p.run()
+ p.run().wait_until_finish()
def pipeline_logging(lines, output):
@@ -296,7 +300,6 @@ def pipeline_logging(lines, output):
import re
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
# [START pipeline_logging]
# import Python logging module.
@@ -316,7 +319,7 @@ def pipeline_logging(lines, output):
# Remaining WordCount example code ...
# [END pipeline_logging]
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
(p
| beam.Create(lines)
| beam.ParDo(ExtractWordsFn())
@@ -372,7 +375,7 @@ def pipeline_monitoring(renames):
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(WordCountOptions)
- p = beam.Pipeline(options=pipeline_options)
+ p = TestPipeline() # Use TestPipeline for testing.
# [START pipeline_monitoring_execution]
(p
@@ -405,7 +408,7 @@ def examples_wordcount_minimal(renames):
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
- options.view_as(StandardOptions).runner = 'BlockingDataflowRunner'
+ options.view_as(StandardOptions).runner = 'DataflowRunner'
# [END examples_wordcount_minimal_options]
# Run it locally for testing.
@@ -441,8 +444,9 @@ def examples_wordcount_minimal(renames):
p.visit(SnippetUtils.RenameFiles(renames))
# [START examples_wordcount_minimal_run]
- p.run()
+ result = p.run()
# [END examples_wordcount_minimal_run]
+ result.wait_until_finish()
def examples_wordcount_wordcount(renames):
@@ -497,7 +501,7 @@ def examples_wordcount_wordcount(renames):
formatted | beam.io.WriteToText('gs://my-bucket/counts.txt')
p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
+ p.run().wait_until_finish()
def examples_wordcount_debugging(renames):
@@ -505,7 +509,6 @@ def examples_wordcount_debugging(renames):
import re
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
# [START example_wordcount_debugging_logging]
# [START example_wordcount_debugging_aggregators]
@@ -546,7 +549,7 @@ def examples_wordcount_debugging(renames):
# [END example_wordcount_debugging_logging]
# [END example_wordcount_debugging_aggregators]
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
filtered_words = (
p
| beam.io.ReadFromText(
@@ -649,7 +652,7 @@ def model_custom_source(count):
lines, beam.equal_to(
['line ' + str(number) for number in range(0, count)]))
- p.run()
+ p.run().wait_until_finish()
# We recommend users to start Source classes with an underscore to discourage
# using the Source class directly when a PTransform for the source is
@@ -679,7 +682,7 @@ def model_custom_source(count):
lines, beam.equal_to(
['line ' + str(number) for number in range(0, count)]))
- p.run()
+ p.run().wait_until_finish()
def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
@@ -781,7 +784,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
final_table_name))
# [END model_custom_sink_use_new_sink]
- p.run()
+ p.run().wait_until_finish()
# We recommend users to start Sink class names with an underscore to
# discourage using the Sink class directly when a PTransform for the sink is
@@ -812,7 +815,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
'http://url_to_simple_kv/', final_table_name)
# [END model_custom_sink_use_ptransform]
- p.run()
+ p.run().wait_until_finish()
def model_textio(renames):
@@ -841,7 +844,7 @@ def model_textio(renames):
# [END model_textio_write]
p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
+ p.run().wait_until_finish()
def model_datastoreio():
@@ -954,8 +957,7 @@ def model_composite_transform_example(contents, output_path):
# [END composite_ptransform_apply_method]
# [END composite_transform_example]
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
(p
| beam.Create(contents)
| CountWords()
@@ -967,8 +969,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
"""Merging a PCollection with Flatten."""
some_hash_fn = lambda s: ord(s[0])
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
# Partition into deciles
@@ -1005,8 +1006,7 @@ def model_multiple_pcollections_partition(contents, output_path):
"""Assume i in [0,100)."""
return i
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
students = p | beam.Create(contents)
@@ -1032,8 +1032,7 @@ def model_group_by_key(contents, output_path):
import re
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
words_and_counts = (
p
| beam.Create(contents)
@@ -1056,8 +1055,7 @@ def model_group_by_key(contents, output_path):
def model_co_group_by_key_tuple(email_list, phone_list, output_path):
"""Applying a CoGroupByKey Transform to a tuple."""
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
# [START model_group_by_key_cogroupbykey_tuple]
# Each data set is represented by key-value pairs in separate PCollections.
# Both data sets share a common key type (in this example str).
@@ -1094,9 +1092,8 @@ def model_join_using_side_inputs(
import apache_beam as beam
from apache_beam.pvalue import AsIter
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
# [START model_join_using_side_inputs]
# This code performs a join by receiving the set of names as an input and
# passing PCollections that contain emails and phone numbers as side inputs
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index ffe0f58..34863f1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -20,7 +20,6 @@
import glob
import logging
import os
-import sys
import tempfile
import unittest
import uuid
@@ -35,6 +34,7 @@ from apache_beam.utils.pipeline_options import TypeOptions
from apache_beam.examples.snippets import snippets
# pylint: disable=expression-not-assigned
+from apache_beam.test_pipeline import TestPipeline
class ParDoTest(unittest.TestCase):
@@ -110,7 +110,7 @@ class ParDoTest(unittest.TestCase):
self.assertEqual({1, 2, 4}, set(result))
def test_pardo_side_input(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
# [START model_pardo_side_input]
@@ -228,7 +228,7 @@ class ParDoTest(unittest.TestCase):
class TypeHintsTest(unittest.TestCase):
def test_bad_types(self):
- p = beam.Pipeline('DirectRunner', argv=sys.argv)
+ p = TestPipeline()
evens = None # pylint: disable=unused-variable
# [START type_hints_missing_define_numbers]
@@ -290,7 +290,7 @@ class TypeHintsTest(unittest.TestCase):
def test_runtime_checks_off(self):
# pylint: disable=expression-not-assigned
- p = beam.Pipeline('DirectRunner', argv=sys.argv)
+ p = TestPipeline()
# [START type_hints_runtime_off]
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
p.run()
@@ -298,7 +298,7 @@ class TypeHintsTest(unittest.TestCase):
def test_runtime_checks_on(self):
# pylint: disable=expression-not-assigned
- p = beam.Pipeline('DirectRunner', argv=sys.argv)
+ p = TestPipeline()
with self.assertRaises(typehints.TypeCheckError):
# [START type_hints_runtime_on]
p.options.view_as(TypeOptions).runtime_type_check = True
@@ -307,7 +307,7 @@ class TypeHintsTest(unittest.TestCase):
# [END type_hints_runtime_on]
def test_deterministic_key(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
lines = (p | beam.Create(
['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index d25ec3e..d0cc8a2 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -56,7 +56,7 @@ def run(argv=None):
transformed | beam.io.Write(
beam.io.PubSubSink(known_args.output_topic))
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 35c1abb..adfc33d 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -66,7 +66,7 @@ def run(argv=None):
transformed | beam.io.Write(
'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 211211d..92929af 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -97,6 +97,7 @@ def run(argv=None):
# Actually run the pipeline (all operations above are deferred).
result = p.run()
+ result.wait_until_finish()
empty_line_values = result.aggregated_values(empty_line_aggregator)
logging.info('number of empty lines: %d', sum(empty_line_values.values()))
word_length_values = result.aggregated_values(average_word_size_aggregator)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 20d1c2f..ac13f35 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -32,7 +32,7 @@ pipeline configuration::
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMP_DIRECTORY
--job_name YOUR_JOB_NAME
- --runner BlockingDataflowRunner
+ --runner DataflowRunner
and an output prefix on GCS::
@@ -151,7 +151,7 @@ def run(argv=None):
| 'write' >> WriteToText(known_args.output))
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 18595d0..b80ed84 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -73,7 +73,7 @@ def run(argv=None):
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
- # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowRunner to
+ # CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
# run your pipeline on the Google Cloud Dataflow Service.
'--runner=DirectRunner',
# CHANGE 3/5: Your project ID is required in order to run your pipeline on
@@ -113,7 +113,7 @@ def run(argv=None):
output | 'write' >> WriteToText(known_args.output)
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 3ff2529..2cc4684 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -26,6 +26,7 @@ from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io import source_test_utils
from apache_beam.io.concat_source import ConcatSource
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -212,7 +213,7 @@ class ConcatSourceTest(unittest.TestCase):
RangeSource(10, 100),
RangeSource(100, 1000),
])
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Read(source)
assert_that(pcoll, equal_to(range(1000)))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 8f12627..7481c4c 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -38,6 +38,7 @@ from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
from apache_beam.io.filebasedsource import FileBasedSource
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.util import assert_that
@@ -364,7 +365,7 @@ class TestFileBasedSource(unittest.TestCase):
self.assertItemsEqual(expected_data, read_data)
def _run_source_test(self, pattern, expected_data, splittable=True):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
pattern, splittable=splittable))
assert_that(pcoll, equal_to(expected_data))
@@ -404,7 +405,7 @@ class TestFileBasedSource(unittest.TestCase):
with bz2.BZ2File(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
@@ -419,7 +420,7 @@ class TestFileBasedSource(unittest.TestCase):
with gzip.GzipFile(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
@@ -437,7 +438,7 @@ class TestFileBasedSource(unittest.TestCase):
compressed_chunks.append(
compressobj.compress('\n'.join(c)) + compressobj.flush())
file_pattern = write_prepared_pattern(compressed_chunks)
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
splittable=False,
@@ -456,7 +457,7 @@ class TestFileBasedSource(unittest.TestCase):
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(compressed_chunks)
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
splittable=False,
@@ -471,7 +472,7 @@ class TestFileBasedSource(unittest.TestCase):
with bz2.BZ2File(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
compression_type=fileio.CompressionTypes.AUTO))
@@ -485,7 +486,7 @@ class TestFileBasedSource(unittest.TestCase):
with gzip.GzipFile(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
compression_type=fileio.CompressionTypes.AUTO))
@@ -504,7 +505,7 @@ class TestFileBasedSource(unittest.TestCase):
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(
compressed_chunks, suffixes=['.gz']*len(chunks))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
compression_type=fileio.CompressionTypes.AUTO))
@@ -526,7 +527,7 @@ class TestFileBasedSource(unittest.TestCase):
chunks_to_write.append('\n'.join(c))
file_pattern = write_prepared_pattern(chunks_to_write,
suffixes=(['.gz', '']*3))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
compression_type=fileio.CompressionTypes.AUTO))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 68e2bce..8842369 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -34,6 +34,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import fileio
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -760,7 +761,7 @@ class TestNativeTextFileSink(unittest.TestCase):
self.assertEqual(f.read().splitlines(), [])
def test_write_native(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned
pipeline.run()
@@ -773,7 +774,7 @@ class TestNativeTextFileSink(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_native_auto_compression(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
fileio.NativeTextFileSink(
@@ -788,7 +789,7 @@ class TestNativeTextFileSink(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_native_auto_compression_unsharded(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
fileio.NativeTextFileSink(
@@ -880,7 +881,7 @@ class TestFileSink(unittest.TestCase):
temp_path = tempfile.NamedTemporaryFile().name
sink = MyFileSink(
temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
p.run()
self.assertEqual(
@@ -894,7 +895,7 @@ class TestFileSink(unittest.TestCase):
num_shards=3,
shard_name_template='_NN_SSS_',
coder=coders.ToStringCoder())
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index f0f2046..dc0fd54 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -27,6 +27,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import iobase
from apache_beam.io import range_trackers
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -98,7 +99,7 @@ class SourcesTest(unittest.TestCase):
def test_run_direct(self):
file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Read(LineSource(file_name))
assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd']))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 877e190..07ab4cc 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -43,6 +43,8 @@ from apache_beam.io.filebasedsource_test import write_data
from apache_beam.io.filebasedsource_test import write_pattern
from apache_beam.io.fileio import CompressionTypes
+from apache_beam.test_pipeline import TestPipeline
+
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -304,7 +306,7 @@ class TextSourceTest(unittest.TestCase):
def test_dataflow_single_file(self):
file_name, expected_data = write_data(5)
assert len(expected_data) == 5
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name)
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
@@ -319,7 +321,7 @@ class TextSourceTest(unittest.TestCase):
file_name, expected_data = write_data(5)
assert len(expected_data) == 5
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name, coder=DummyCoder())
assert_that(pcoll, equal_to([record * 2 for record in expected_data]))
pipeline.run()
@@ -327,7 +329,7 @@ class TextSourceTest(unittest.TestCase):
def test_dataflow_file_pattern(self):
pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4])
assert len(expected_data) == 40
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(pattern)
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
@@ -339,7 +341,7 @@ class TextSourceTest(unittest.TestCase):
with bz2.BZ2File(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name)
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -351,7 +353,7 @@ class TextSourceTest(unittest.TestCase):
with gzip.GzipFile(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name)
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -363,7 +365,7 @@ class TextSourceTest(unittest.TestCase):
with bz2.BZ2File(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
file_name,
compression_type=CompressionTypes.BZIP2)
@@ -377,7 +379,7 @@ class TextSourceTest(unittest.TestCase):
with gzip.GzipFile(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
file_name,
0, CompressionTypes.GZIP,
@@ -392,7 +394,7 @@ class TextSourceTest(unittest.TestCase):
with gzip.GzipFile(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
file_name,
0, CompressionTypes.GZIP,
@@ -425,7 +427,7 @@ class TextSourceTest(unittest.TestCase):
def test_read_gzip_empty_file(self):
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template).name
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
filename,
0, CompressionTypes.GZIP,
@@ -521,7 +523,7 @@ class TextSinkTest(unittest.TestCase):
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_write_dataflow(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned
pipeline.run()
@@ -534,7 +536,7 @@ class TextSinkTest(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_dataflow_auto_compression(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned
pipeline.run()
@@ -547,7 +549,7 @@ class TextSinkTest(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_dataflow_auto_compression_unsharded(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned
pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 0110273..fa78d6f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -25,6 +25,7 @@ select = E3
[testenv:py27]
deps=
+ nose
pep8
pylint
commands =