You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 18:11:33 UTC
[1/6] beam git commit: Make TestPipeline.run() blocking by default.
Repository: beam
Updated Branches:
refs/heads/python-sdk f25c0e434 -> 36a7d3491
Make TestPipeline.run() blocking by default.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e49f518
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e49f518
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e49f518
Branch: refs/heads/python-sdk
Commit: 2e49f518bcb2f0ab16e4f17f75f85eb28534a1b4
Parents: 74dda50
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 13 14:09:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:35 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/test_pipeline.py | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2e49f518/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 69f4ddd..c29a879 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -58,7 +58,8 @@ class TestPipeline(Pipeline):
runner=None,
options=None,
argv=None,
- is_integration_test=False):
+ is_integration_test=False,
+ blocking=True):
"""Initialize a pipeline object for test.
Args:
@@ -72,6 +73,7 @@ class TestPipeline(Pipeline):
is None.
is_integration_test: True if the test is an integration test, False
otherwise.
+ blocking: Run method will wait until pipeline execution is completed.
Raises:
ValueError: if either the runner or options argument is not of the
@@ -79,10 +81,17 @@ class TestPipeline(Pipeline):
"""
self.is_integration_test = is_integration_test
self.options_list = self._parse_test_option_args(argv)
+ self.blocking = blocking
if options is None:
options = PipelineOptions(self.options_list)
super(TestPipeline, self).__init__(runner, options)
+ def run(self):
+ result = super(TestPipeline, self).run()
+ if self.blocking:
+ result.wait_until_finish()
+ return result
+
def _parse_test_option_args(self, argv):
"""Parse value of command line argument: --test-pipeline-options to get
pipeline options.
[4/6] beam git commit: Update tests to use TestPipeline()
Posted by ro...@apache.org.
Update tests to use TestPipeline()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ded9185
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ded9185
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ded9185
Branch: refs/heads/python-sdk
Commit: 4ded9185c063eae9f54ce457a1fa753679d1ce82
Parents: 703c1bc
Author: Ahmet Altay <al...@google.com>
Authored: Sun Jan 15 00:35:49 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 10:01:41 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline_test.py | 13 +--
.../apache_beam/tests/pipeline_verifiers.py | 4 +-
.../apache_beam/transforms/aggregator_test.py | 3 +-
.../apache_beam/transforms/combiners_test.py | 26 +++---
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 93 ++++++++++----------
.../apache_beam/transforms/window_test.py | 10 +--
.../transforms/write_ptransform_test.py | 5 +-
.../typehints/typed_pipeline_test.py | 6 +-
9 files changed, 81 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index d6925d4..336bf54 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -25,6 +25,7 @@ from apache_beam.pipeline import Pipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.dataflow.native_io.iobase import NativeSource
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import CombineGlobally
from apache_beam.transforms import Create
from apache_beam.transforms import FlatMap
@@ -94,7 +95,7 @@ class PipelineTest(unittest.TestCase):
self.leave_composite.append(transform_node)
def test_create(self):
- pipeline = Pipeline(self.runner_name)
+ pipeline = TestPipeline(runner=self.runner_name)
pcoll = pipeline | 'label1' >> Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))
@@ -105,13 +106,13 @@ class PipelineTest(unittest.TestCase):
pipeline.run()
def test_create_singleton_pcollection(self):
- pipeline = Pipeline(self.runner_name)
+ pipeline = TestPipeline(runner=self.runner_name)
pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
assert_that(pcoll, equal_to([[1, 2, 3]]))
pipeline.run()
def test_read(self):
- pipeline = Pipeline(self.runner_name)
+ pipeline = TestPipeline(runner=self.runner_name)
pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
assert_that(pcoll, equal_to([1, 2, 3]))
pipeline.run()
@@ -136,7 +137,7 @@ class PipelineTest(unittest.TestCase):
self.assertEqual(visitor.leave_composite[0].transform, transform)
def test_apply_custom_transform(self):
- pipeline = Pipeline(self.runner_name)
+ pipeline = TestPipeline(runner=self.runner_name)
pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
result = pcoll | PipelineTest.CustomTransform()
assert_that(result, equal_to([2, 3, 4]))
@@ -158,7 +159,7 @@ class PipelineTest(unittest.TestCase):
'pvalue | "label" >> transform')
def test_reuse_cloned_custom_transform_instance(self):
- pipeline = Pipeline(self.runner_name)
+ pipeline = TestPipeline(runner=self.runner_name)
pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6])
transform = PipelineTest.CustomTransform()
@@ -207,7 +208,7 @@ class PipelineTest(unittest.TestCase):
num_elements = 10
num_maps = 100
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline(runner='DirectRunner')
# Consumed memory should not be proportional to the number of maps.
memory_threshold = (
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 6bf8d48..9b286a2 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -46,7 +46,7 @@ class PipelineStateMatcher(BaseMatcher):
self.expected_state = expected_state
def _matches(self, pipeline_result):
- return pipeline_result.current_state() == self.expected_state
+ return pipeline_result.state == self.expected_state
def describe_to(self, description):
description \
@@ -56,7 +56,7 @@ class PipelineStateMatcher(BaseMatcher):
def describe_mismatch(self, pipeline_result, mismatch_description):
mismatch_description \
.append_text("Test pipeline job terminated in state: ") \
- .append_text(pipeline_result.current_state())
+ .append_text(pipeline_result.state)
def retry_on_io_error_and_server_error(exception):
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py
index e77dfba..d493c46 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -22,6 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.transforms import combiners
from apache_beam.transforms.aggregator import Aggregator
+from apache_beam.test_pipeline import TestPipeline
class AggregatorTest(unittest.TestCase):
@@ -63,7 +64,7 @@ class AggregatorTest(unittest.TestCase):
for a in aggregators:
context.aggregate_to(a, context.element)
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators()) # pylint: disable=expression-not-assigned
res = p.run()
for (_, _, expected), a in zip(counter_types, aggregators):
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 72fce60..8a6d352 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -22,7 +22,7 @@ import unittest
import hamcrest as hc
import apache_beam as beam
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.core import CombineGlobally
from apache_beam.transforms.core import Create
@@ -40,7 +40,7 @@ class CombineTest(unittest.TestCase):
combine.TopCombineFn._MIN_BUFFER_OVERSIZE = 1
def test_builtin_combines(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
mean = sum(vals) / float(len(vals))
@@ -62,7 +62,7 @@ class CombineTest(unittest.TestCase):
pipeline.run()
def test_top(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
# A parameter we'll be sharing with a custom comparator.
names = {0: 'zo',
@@ -201,7 +201,7 @@ class CombineTest(unittest.TestCase):
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_top_shorthands(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
@@ -222,7 +222,7 @@ class CombineTest(unittest.TestCase):
# First test global samples (lots of them).
for ix in xrange(300):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
@@ -241,7 +241,7 @@ class CombineTest(unittest.TestCase):
pipeline.run()
# Now test per-key samples.
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start-perkey' >> Create(
sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
@@ -258,7 +258,7 @@ class CombineTest(unittest.TestCase):
pipeline.run()
def test_tuple_combine_fn(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
result = (
p
| Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
@@ -269,7 +269,7 @@ class CombineTest(unittest.TestCase):
p.run()
def test_tuple_combine_fn_without_defaults(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
result = (
p
| Create([1, 1, 2, 3])
@@ -280,7 +280,7 @@ class CombineTest(unittest.TestCase):
p.run()
def test_to_list_and_to_dict(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
pcoll = pipeline | 'start' >> Create(the_list)
result = pcoll | 'to list' >> combine.ToList()
@@ -292,7 +292,7 @@ class CombineTest(unittest.TestCase):
assert_that(result, matcher([the_list]))
pipeline.run()
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pairs = [(1, 2), (3, 4), (5, 6)]
pcoll = pipeline | 'start-pairs' >> Create(pairs)
result = pcoll | 'to dict' >> combine.ToDict()
@@ -306,12 +306,12 @@ class CombineTest(unittest.TestCase):
pipeline.run()
def test_combine_globally_with_default(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
p.run()
def test_combine_globally_without_default(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
result = p | Create([]) | CombineGlobally(sum).without_defaults()
assert_that(result, equal_to([]))
p.run()
@@ -323,7 +323,7 @@ class CombineTest(unittest.TestCase):
main = pcoll.pipeline | Create([None])
return main | Map(lambda _, s: s, side)
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
assert_that(result1, equal_to([0]), label='r1')
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 006a937..b5ac64b 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -443,7 +443,7 @@ class PTransform(WithTypeHints, HasDisplayData):
# Get a reference to the runners internal cache, otherwise runner may
# clean it after run.
cache = p.runner.cache
- p.run()
+ p.run().wait_until_finish()
return _MaterializePValues(cache).visit(result)
def _extract_input_pvalues(self, pvalueish):
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 705e85e..58382e4 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -26,7 +26,7 @@ import unittest
import hamcrest as hc
import apache_beam as beam
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
import apache_beam.pvalue as pvalue
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.display import DisplayData, DisplayDataItem
@@ -36,7 +36,6 @@ import apache_beam.typehints as typehints
from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
from apache_beam.typehints.typehints_test import TypeHintTestCase
-from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import TypeOptions
@@ -54,12 +53,12 @@ class PTransformTest(unittest.TestCase):
self.assertEqual('<PTransform(PTransform) label=[PTransform]>',
str(PTransform()))
- pa = Pipeline('DirectRunner')
+ pa = TestPipeline()
res = pa | 'a_label' >> beam.Create([1, 2])
self.assertEqual('AppliedPTransform(a_label, Create)',
str(res.producer))
- pc = Pipeline('DirectRunner')
+ pc = TestPipeline()
res = pc | beam.Create([1, 2])
inputs_tr = res.producer.transform
inputs_tr.inputs = ('ci',)
@@ -67,7 +66,7 @@ class PTransformTest(unittest.TestCase):
"""<Create(PTransform) label=[Create] inputs=('ci',)>""",
str(inputs_tr))
- pd = Pipeline('DirectRunner')
+ pd = TestPipeline()
res = pd | beam.Create([1, 2])
side_tr = res.producer.transform
side_tr.side_inputs = (4,)
@@ -111,7 +110,7 @@ class PTransformTest(unittest.TestCase):
def process(self, context, addon):
return [context.element + addon]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
assert_that(result, equal_to([11, 12, 13]))
@@ -123,20 +122,20 @@ class PTransformTest(unittest.TestCase):
def process(self, context):
pass
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
with self.assertRaises(ValueError):
pcoll | 'do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s
def test_do_with_callable(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
assert_that(result, equal_to([11, 12, 13]))
pipeline.run()
def test_do_with_side_input_as_arg(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
side = pipeline | 'side' >> beam.Create([10])
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
result = pcoll | beam.FlatMap(
@@ -145,7 +144,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_do_with_side_input_as_keyword_arg(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
side = pipeline | 'side' >> beam.Create([10])
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
result = pcoll | beam.FlatMap(
@@ -154,7 +153,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_do_with_do_fn_returning_string_raises_warning(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
@@ -168,7 +167,7 @@ class PTransformTest(unittest.TestCase):
self.assertStartswith(cm.exception.message, expected_error_prefix)
def test_do_with_do_fn_returning_dict_raises_warning(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
@@ -182,7 +181,7 @@ class PTransformTest(unittest.TestCase):
self.assertStartswith(cm.exception.message, expected_error_prefix)
def test_do_with_side_outputs_maintains_unique_name(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
@@ -195,7 +194,7 @@ class PTransformTest(unittest.TestCase):
# iterable.
def incorrect_par_do_fn(x):
return x + 5
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
# It's a requirement that all user-defined functions to a ParDo return
@@ -216,7 +215,7 @@ class PTransformTest(unittest.TestCase):
def finish_bundle(self, c):
yield 'finish'
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
result = pcoll | 'do' >> beam.ParDo(MyDoFn())
@@ -231,7 +230,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_filter(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
result = pcoll | beam.Filter(
'filter', lambda x: x % 2 == 0)
@@ -257,7 +256,7 @@ class PTransformTest(unittest.TestCase):
def test_combine_with_combine_fn(self):
vals = [1, 2, 3, 4, 5, 6, 7]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(vals)
result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
assert_that(result, equal_to([sum(vals) / len(vals)]))
@@ -265,7 +264,7 @@ class PTransformTest(unittest.TestCase):
def test_combine_with_callable(self):
vals = [1, 2, 3, 4, 5, 6, 7]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(vals)
result = pcoll | beam.CombineGlobally(sum)
assert_that(result, equal_to([sum(vals)]))
@@ -273,7 +272,7 @@ class PTransformTest(unittest.TestCase):
def test_combine_with_side_input_as_arg(self):
values = [1, 2, 3, 4, 5, 6, 7]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(values)
divisor = pipeline | 'divisor' >> beam.Create([2])
result = pcoll | beam.CombineGlobally(
@@ -288,7 +287,7 @@ class PTransformTest(unittest.TestCase):
def test_combine_per_key_with_combine_fn(self):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
@@ -299,7 +298,7 @@ class PTransformTest(unittest.TestCase):
def test_combine_per_key_with_callable(self):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
result = pcoll | beam.CombinePerKey(sum)
@@ -309,7 +308,7 @@ class PTransformTest(unittest.TestCase):
def test_combine_per_key_with_side_input_as_arg(self):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
divisor = pipeline | 'divisor' >> beam.Create([2])
@@ -322,7 +321,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_group_by_key(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Create(
'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
result = pcoll | 'group' >> beam.GroupByKey()
@@ -336,7 +335,7 @@ class PTransformTest(unittest.TestCase):
def partition_for(self, context, num_partitions, offset):
return (context.element % 3) + offset
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
# Attempt nominal partition operation.
partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
@@ -348,14 +347,14 @@ class PTransformTest(unittest.TestCase):
# Check that a bad partition label will yield an error. For the
# DirectRunner, this error manifests as an exception.
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
with self.assertRaises(ValueError):
pipeline.run()
def test_partition_with_callable(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
partitions = (
pcoll | beam.Partition(
@@ -370,7 +369,7 @@ class PTransformTest(unittest.TestCase):
def test_partition_followed_by_flatten_and_groupbykey(self):
"""Regression test for an issue with how partitions are handled."""
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
contents = [('aa', 1), ('bb', 2), ('aa', 2)]
created = pipeline | 'A' >> beam.Create(contents)
partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
@@ -380,7 +379,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_flatten_pcollections(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
@@ -388,7 +387,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_flatten_no_pcollections(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
with self.assertRaises(ValueError):
() | 'pipeline arg missing' >> beam.Flatten()
result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
@@ -396,7 +395,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_flatten_pcollections_in_iterable(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
@@ -417,7 +416,7 @@ class PTransformTest(unittest.TestCase):
set([1, 2, 3]) | 'flatten' >> beam.Flatten()
def test_co_group_by_key_on_list(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll_1 = pipeline | beam.Create(
'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
pcoll_2 = pipeline | beam.Create(
@@ -429,7 +428,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_co_group_by_key_on_iterable(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll_1 = pipeline | beam.Create(
'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
pcoll_2 = pipeline | beam.Create(
@@ -442,7 +441,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_co_group_by_key_on_dict(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll_1 = pipeline | beam.Create(
'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
pcoll_2 = pipeline | beam.Create(
@@ -454,7 +453,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_group_by_key_input_must_be_kv_pairs(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5])
with self.assertRaises(typehints.TypeCheckError) as e:
@@ -467,7 +466,7 @@ class PTransformTest(unittest.TestCase):
'Tuple[TypeVariable[K], TypeVariable[V]]')
def test_group_by_key_only_input_must_be_kv_pairs(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
with self.assertRaises(typehints.TypeCheckError) as cm:
pcolls | 'D' >> beam.GroupByKeyOnly()
@@ -478,7 +477,7 @@ class PTransformTest(unittest.TestCase):
self.assertStartswith(cm.exception.message, expected_error_prefix)
def test_keys_and_values(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Create(
'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
keys = pcoll.apply('keys', beam.Keys())
@@ -488,7 +487,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_kv_swap(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Create(
'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
result = pcoll.apply('swap', beam.KvSwap())
@@ -496,7 +495,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_remove_duplicates(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Create(
'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
result = pcoll.apply('nodupes', beam.RemoveDuplicates())
@@ -504,7 +503,7 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
def test_chained_ptransforms(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
t = (beam.Map(lambda x: (x, 1))
| beam.GroupByKey()
| beam.Map(lambda (x, ones): (x, sum(ones))))
@@ -581,7 +580,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_chained_ptransforms(self):
"""Tests that chaining gets proper nesting."""
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
map1 = beam.Map('map1', lambda x: (x, 1))
gbk = beam.GroupByKey('gbk')
map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
@@ -594,7 +593,7 @@ class PTransformLabelsTest(unittest.TestCase):
pipeline.run()
def test_apply_custom_transform_without_label(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
custom = PTransformLabelsTest.CustomTransform()
result = pipeline.apply(custom, pcoll)
@@ -604,7 +603,7 @@ class PTransformLabelsTest(unittest.TestCase):
pipeline.run()
def test_apply_custom_transform_with_label(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
custom = PTransformLabelsTest.CustomTransform('*custom*')
result = pipeline.apply(custom, pcoll)
@@ -615,7 +614,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_combine_without_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(vals)
combine = beam.CombineGlobally(sum)
result = pcoll | combine
@@ -624,7 +623,7 @@ class PTransformLabelsTest(unittest.TestCase):
pipeline.run()
def test_apply_ptransform_using_decorator(self):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
sample = SamplePTransform('*sample*')
_ = pcoll | sample
@@ -635,7 +634,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_combine_with_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(vals)
combine = beam.CombineGlobally('*sum*', sum)
result = pcoll | combine
@@ -644,7 +643,7 @@ class PTransformLabelsTest(unittest.TestCase):
pipeline.run()
def check_label(self, ptransform, expected_label):
- pipeline = Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
actual_label = sorted(pipeline.applied_labels - {'start'})[0]
self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
@@ -728,7 +727,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
'"%s" does not start with "%s"' % (msg, prefix))
def setUp(self):
- self.p = Pipeline(options=PipelineOptions([]))
+ self.p = TestPipeline()
def test_do_fn_pipeline_pipeline_type_check_satisfied(self):
@with_input_types(int, int)
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 856d011..d4e8e25 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,7 +19,7 @@
import unittest
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import combiners
from apache_beam.transforms import core
@@ -140,7 +140,7 @@ class WindowTest(unittest.TestCase):
| Map(lambda x: WindowedValue((key, x), x, [])))
def test_sliding_windows(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
result = (pcoll
| 'w' >> WindowInto(SlidingWindows(period=2, size=4))
@@ -153,7 +153,7 @@ class WindowTest(unittest.TestCase):
p.run()
def test_sessions(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
result = (pcoll
| 'w' >> WindowInto(Sessions(10))
@@ -166,7 +166,7 @@ class WindowTest(unittest.TestCase):
p.run()
def test_timestamped_value(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
result = (p
| 'start' >> Create([(k, k) for k in range(10)])
| Map(lambda (x, t): TimestampedValue(x, t))
@@ -178,7 +178,7 @@ class WindowTest(unittest.TestCase):
p.run()
def test_timestamped_with_combiners(self):
- p = Pipeline('DirectRunner')
+ p = TestPipeline()
result = (p
# Create some initial test values.
| 'start' >> Create([(k, k) for k in range(10)])
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index f96dffb..3d7fbd9 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -22,10 +22,9 @@ import unittest
import apache_beam as beam
from apache_beam.io import iobase
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.util import assert_that, is_empty
-from apache_beam.utils.pipeline_options import PipelineOptions
class _TestSink(iobase.Sink):
@@ -99,7 +98,7 @@ class WriteTest(unittest.TestCase):
return_write_results=True):
write_to_test_sink = WriteToTestSink(return_init_result,
return_write_results)
- p = Pipeline(options=PipelineOptions([]))
+ p = TestPipeline()
result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
assert_that(result, is_empty())
http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 8b5e3f4..35987b7 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -23,10 +23,10 @@ import unittest
import apache_beam as beam
from apache_beam import pvalue
from apache_beam import typehints
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.typehints import WithTypeHints
from apache_beam.utils.pipeline_options import OptionsContext
-from apache_beam.utils.pipeline_options import PipelineOptions
# These test often construct a pipeline as value | PTransform to test side
# effects (e.g. errors).
@@ -168,7 +168,7 @@ class SideInputTest(unittest.TestCase):
@typehints.with_input_types(str, int)
def repeat(s, times):
return s * times
- p = beam.Pipeline(options=PipelineOptions([]))
+ p = TestPipeline()
main_input = p | beam.Create(['a', 'bb', 'c'])
side_input = p | 'side' >> beam.Create([3])
result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
@@ -183,7 +183,7 @@ class SideInputTest(unittest.TestCase):
@typehints.with_input_types(str, typehints.Iterable[str])
def concat(glue, items):
return glue.join(sorted(items))
- p = beam.Pipeline(options=PipelineOptions([]))
+ p = TestPipeline()
main_input = p | beam.Create(['a', 'bb', 'c'])
side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
[3/6] beam git commit: Changed tests in examples/ and io/ to use
TestPipeline.
Posted by ro...@apache.org.
Changed tests in examples/ and io/ to use TestPipeline.
Removed wait_until_finish except for a few exceptions:
- tfidf: as an example usage.
- some examples in cookbook - they run examples directly
and, did not want to update the examples to use TestPipeline.
- some snippets - if the pipeline creations is part of the snippet
and it was not easy to override.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/703c1bc1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/703c1bc1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/703c1bc1
Branch: refs/heads/python-sdk
Commit: 703c1bc18ac28aa1aa75a9359c1bcc4fbdd28f35
Parents: 2e49f51
Author: Ahmet Altay <al...@google.com>
Authored: Sat Jan 14 23:49:25 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:37 2017 -0800
----------------------------------------------------------------------
.../examples/complete/autocomplete_test.py | 3 +-
.../examples/complete/estimate_pi_test.py | 4 +-
.../complete/juliaset/juliaset/juliaset.py | 2 +-
.../complete/juliaset/juliaset/juliaset_test.py | 2 +-
.../examples/complete/juliaset/juliaset_main.py | 2 +-
.../apache_beam/examples/complete/tfidf.py | 3 +-
.../apache_beam/examples/complete/tfidf_test.py | 3 +-
.../examples/complete/top_wikipedia_sessions.py | 2 +-
.../complete/top_wikipedia_sessions_test.py | 3 +-
.../cookbook/bigquery_side_input_test.py | 3 +-
.../cookbook/bigquery_tornadoes_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 2 +-
.../examples/cookbook/bigshuffle_test.py | 2 +-
.../examples/cookbook/coders_test.py | 3 +-
.../examples/cookbook/combiners_test.py | 5 +-
.../examples/cookbook/custom_ptransform.py | 4 +-
.../examples/cookbook/custom_ptransform_test.py | 3 +-
.../examples/cookbook/datastore_wordcount.py | 7 ++-
.../examples/cookbook/filters_test.py | 3 +-
.../examples/cookbook/group_with_coder.py | 2 +-
.../examples/cookbook/group_with_coder_test.py | 4 +-
.../examples/cookbook/mergecontacts.py | 2 +-
.../examples/cookbook/mergecontacts_test.py | 3 +-
.../examples/cookbook/multiple_output_pardo.py | 4 +-
.../cookbook/multiple_output_pardo_test.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 65 ++++++++++----------
.../examples/snippets/snippets_test.py | 12 ++--
.../apache_beam/examples/streaming_wordcap.py | 2 +-
.../apache_beam/examples/streaming_wordcount.py | 2 +-
sdks/python/apache_beam/examples/wordcount.py | 1 +
.../apache_beam/examples/wordcount_debugging.py | 4 +-
.../apache_beam/examples/wordcount_minimal.py | 4 +-
.../python/apache_beam/io/concat_source_test.py | 3 +-
.../apache_beam/io/filebasedsource_test.py | 19 +++---
sdks/python/apache_beam/io/fileio_test.py | 11 ++--
sdks/python/apache_beam/io/sources_test.py | 3 +-
sdks/python/apache_beam/io/textio_test.py | 26 ++++----
sdks/python/tox.ini | 1 +
38 files changed, 123 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 5ed4fb5..edf95f0 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,6 +21,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import autocomplete
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -30,7 +31,7 @@ class AutocompleteTest(unittest.TestCase):
WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
def test_top_prefixes(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
words = p | beam.Create(self.WORDS)
result = words | autocomplete.TopPerPrefix(5)
# values must be hashable for now
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 0440ecc..10010cb 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -20,8 +20,8 @@
import logging
import unittest
-import apache_beam as beam
from apache_beam.examples.complete import estimate_pi
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import DataflowAssertException
@@ -38,7 +38,7 @@ def in_between(lower, upper):
class EstimatePiTest(unittest.TestCase):
def test_basics(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
# Note: Probabilistically speaking this test can fail with a probability
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 45fc1fb..30883dc 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -113,7 +113,7 @@ def run(argv=None): # pylint: disable=missing-docstring
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
| WriteToText(known_args.coordinate_output))
# pylint: enable=expression-not-assigned
- p.run()
+ return p.run()
# Optionally render the image and save it to a file.
# TODO(silviuc): Add this functionality.
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
index c13e857..c254eb4 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
@@ -52,7 +52,7 @@ class JuliaSetTest(unittest.TestCase):
if image_file_name is not None:
args.append('--image_output=%s' % image_file_name)
- juliaset.run(args)
+ juliaset.run(args).wait_until_finish()
def test_output_file_format(self):
grid_size = 5
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
index d6ba064..0db5431 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
@@ -38,7 +38,7 @@ an example:
python juliaset_main.py \
--job_name juliaset-$USER \
--project YOUR-PROJECT \
- --runner BlockingDataflowRunner \
+ --runner DataflowRunner \
--setup_file ./setup.py \
--staging_location gs://YOUR-BUCKET/juliaset/staging \
--temp_location gs://YOUR-BUCKET/juliaset/temp \
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 59b9d6f..4d6e0d3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -200,7 +200,8 @@ def run(argv=None):
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(known_args.output)
- p.run()
+ # Execute the pipeline and wait until it is completed.
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index deda4cb..404ab44 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -25,6 +25,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import tfidf
+from apache_beam.test_pipeline import TestPipeline
EXPECTED_RESULTS = set([
@@ -47,7 +48,7 @@ class TfIdfTest(unittest.TestCase):
f.write(contents)
def test_tfidf_transform(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
uri_to_line = p | beam.Create(
'create sample',
[('1.txt', 'abc def ghi'),
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 2dea642..4920813 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -31,7 +31,7 @@ pipeline configuration in addition to the above:
--project YOUR_PROJECT_ID
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMPORARY_DIRECTORY
- --runner BlockingDataflowRunner
+ --runner DataflowRunner
The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be
overridden with --input.
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 1d25807..9b9d9b1 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -23,6 +23,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import top_wikipedia_sessions
+from apache_beam.test_pipeline import TestPipeline
class ComputeTopSessionsTest(unittest.TestCase):
@@ -49,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
]
def test_compute_top_sessions(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
edits = p | beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 97c41d6..926f141 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -22,12 +22,13 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_side_input
+from apache_beam.test_pipeline import TestPipeline
class BigQuerySideInputTest(unittest.TestCase):
def test_create_groups(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
corpus_pcoll = p | beam.Create('create_corpus',
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 2cb2c45..0fabe3f 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -36,7 +36,7 @@ class BigQueryTornadoesTest(unittest.TestCase):
results = bigquery_tornadoes.count_tornadoes(rows)
beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2},
{'month': 2, 'tornado_count': 1}]))
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index ceeefd6..b5eacce 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -87,7 +87,7 @@ def run(argv=None):
known_args.checksum_output + '-output')
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
index d73c976..60b6acc 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
@@ -38,7 +38,7 @@ class BigShuffleTest(unittest.TestCase):
bigshuffle.run([
'--input=%s*' % temp_path,
'--output=%s.result' % temp_path,
- '--checksum_output=%s.checksum' % temp_path])
+ '--checksum_output=%s.checksum' % temp_path]).wait_until_finish()
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index ec0848f..4a92abb 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -22,6 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import coders
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -34,7 +35,7 @@ class CodersTest(unittest.TestCase):
{'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
def test_compute_points(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
result = (records
| 'points' >> beam.FlatMap(coders.compute_points)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 18bd3bc..a8ed555 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -27,6 +27,7 @@ import logging
import unittest
import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
class CombinersTest(unittest.TestCase):
@@ -44,7 +45,7 @@ class CombinersTest(unittest.TestCase):
can be used.
"""
result = (
- beam.Pipeline(runner=beam.runners.DirectRunner())
+ TestPipeline()
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(sum))
@@ -60,7 +61,7 @@ class CombinersTest(unittest.TestCase):
return result
result = (
- beam.Pipeline(runner=beam.runners.DirectRunner())
+ TestPipeline()
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(multiply))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index cfbb99d..67d1ff8 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -51,7 +51,7 @@ def run_count1(known_args, options):
(p | beam.io.ReadFromText(known_args.input)
| Count1()
| beam.io.WriteToText(known_args.output))
- p.run()
+ p.run().wait_until_finish()
@beam.ptransform_fn
@@ -70,7 +70,7 @@ def run_count2(known_args, options):
(p | ReadFromText(known_args.input)
| Count2() # pylint: disable=no-value-for-parameter
| WriteToText(known_args.output))
- p.run()
+ p.run().wait_until_finish()
@beam.ptransform_fn
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 91309ae..cd1c04a 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -22,6 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import custom_ptransform
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -39,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
self.run_pipeline(custom_ptransform.Count3(factor), factor=factor)
def run_pipeline(self, count_implementation, factor=1):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
result = words | count_implementation
assert_that(
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 8f68fb4..25abb3e 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -141,7 +141,7 @@ def write_to_datastore(project, user_options, pipeline_options):
| 'write to datastore' >> WriteToDatastore(project))
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ p.run().wait_until_finish()
def make_ancestor_query(kind, namespace, ancestor):
@@ -192,7 +192,10 @@ def read_from_datastore(project, user_options, pipeline_options):
num_shards=user_options.num_shards)
# Actually run the pipeline (all operations above are deferred).
- return p.run()
+ result = p.run()
+ # Wait until completion, main thread would access post-completion job results.
+ result.wait_until_finish()
+ return result
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 81dd30f..28bb1e1 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -22,6 +22,7 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import filters
+from apache_beam.test_pipeline import TestPipeline
class FiltersTest(unittest.TestCase):
@@ -35,7 +36,7 @@ class FiltersTest(unittest.TestCase):
]
def _get_result_for_month(self, month):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
rows = (p | 'create' >> beam.Create(self.input_data))
results = filters.filter_cold_days(rows, month)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 78540d1..f6f2108 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -114,7 +114,7 @@ def run(argv=sys.argv[1:]):
| beam.CombinePerKey(sum)
| beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
| WriteToText(known_args.output))
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 268ba8d..4e87966 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -50,7 +50,7 @@ class GroupWithCoderTest(unittest.TestCase):
temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
group_with_coder.run([
'--input=%s*' % temp_path,
- '--output=%s.result' % temp_path])
+ '--output=%s.result' % temp_path]).wait_until_finish()
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
@@ -71,7 +71,7 @@ class GroupWithCoderTest(unittest.TestCase):
group_with_coder.run([
'--no_pipeline_type_check',
'--input=%s*' % temp_path,
- '--output=%s.result' % temp_path])
+ '--output=%s.result' % temp_path]).wait_until_finish()
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 2475e02..6906ae4 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -124,7 +124,7 @@ def run(argv=None, assert_results=None):
beam.assert_that(num_nomads, beam.equal_to([expected_nomads]),
label='assert:nomads')
# Execute pipeline.
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
index b3be0dd..09f71d3 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -107,12 +107,13 @@ class MergeContactsTest(unittest.TestCase):
result_prefix = self.create_temp_file('')
- mergecontacts.run([
+ result = mergecontacts.run([
'--input_email=%s' % path_email,
'--input_phone=%s' % path_phone,
'--input_snailmail=%s' % path_snailmail,
'--output_tsv=%s.tsv' % result_prefix,
'--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))
+ result.wait_until_finish()
with open('%s.tsv-00000-of-00001' % result_prefix) as f:
contents = f.read()
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 3acebc6..a877a1d 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -40,7 +40,7 @@ pipeline configuration:
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMP_DIRECTORY
--job_name YOUR_JOB_NAME
- --runner BlockingDataflowRunner
+ --runner DataflowRunner
and an output prefix on GCS:
--output gs://YOUR_OUTPUT_PREFIX
@@ -173,7 +173,7 @@ def run(argv=None):
| 'count words' >> CountWords()
| 'write words' >> WriteToText(known_args.output + '-words'))
- p.run()
+ return p.run()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
index 3ddd668..2c9111c 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -52,7 +52,7 @@ class MultipleOutputParDo(unittest.TestCase):
multiple_output_pardo.run([
'--input=%s*' % temp_path,
- '--output=%s' % result_prefix])
+ '--output=%s' % result_prefix]).wait_until_finish()
expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n')))
with open(result_prefix + '-chars-00000-of-00001') as f:
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 2eadc44..0cba1af 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -31,6 +31,7 @@ string. The tags can contain only letters, digits and _.
"""
import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
# Quiet some pylint warnings that happen because of the somewhat special
# format for the code snippets.
@@ -88,6 +89,8 @@ def construct_pipeline(renames):
p = beam.Pipeline(options=PipelineOptions())
# [END pipelines_constructing_creating]
+ p = TestPipeline() # Use TestPipeline for testing.
+
# [START pipelines_constructing_reading]
lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')
# [END pipelines_constructing_reading]
@@ -106,8 +109,9 @@ def construct_pipeline(renames):
p.visit(SnippetUtils.RenameFiles(renames))
# [START pipelines_constructing_running]
- p.run()
+ result = p.run()
# [END pipelines_constructing_running]
+ result
def model_pipelines(argv):
@@ -144,8 +148,9 @@ def model_pipelines(argv):
| beam.combiners.Count.PerKey()
| beam.io.WriteToText(my_options.output))
- p.run()
+ result = p.run()
# [END model_pipelines]
+ result.wait_until_finish()
def model_pcollection(argv):
@@ -175,8 +180,9 @@ def model_pcollection(argv):
'Or to take arms against a sea of troubles, '])
| beam.io.WriteToText(my_options.output))
- p.run()
+ result = p.run()
# [END model_pcollection]
+ result.wait_until_finish()
def pipeline_options_remote(argv):
@@ -206,8 +212,7 @@ def pipeline_options_remote(argv):
options = PipelineOptions(flags=argv)
# For Cloud execution, set the Cloud Platform project, job_name,
- # staging location, temp_location and specify DataflowRunner or
- # BlockingDataflowRunner.
+ # staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
@@ -223,9 +228,7 @@ def pipeline_options_remote(argv):
my_input = my_options.input
my_output = my_options.output
- # Overriding the runner for tests.
- options.view_as(StandardOptions).runner = 'DirectRunner'
- p = Pipeline(options=options)
+ p = TestPipeline() # Use TestPipeline for testing.
lines = p | beam.io.ReadFromText(my_input)
lines | beam.io.WriteToText(my_output)
@@ -265,6 +268,7 @@ def pipeline_options_local(argv):
p = Pipeline(options=options)
# [END pipeline_options_local]
+ p = TestPipeline() # Use TestPipeline for testing.
lines = p | beam.io.ReadFromText(my_input)
lines | beam.io.WriteToText(my_output)
p.run()
@@ -288,7 +292,7 @@ def pipeline_options_command_line(argv):
lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
# [END pipeline_options_command_line]
- p.run()
+ p.run().wait_until_finish()
def pipeline_logging(lines, output):
@@ -296,7 +300,6 @@ def pipeline_logging(lines, output):
import re
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
# [START pipeline_logging]
# import Python logging module.
@@ -316,7 +319,7 @@ def pipeline_logging(lines, output):
# Remaining WordCount example code ...
# [END pipeline_logging]
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
(p
| beam.Create(lines)
| beam.ParDo(ExtractWordsFn())
@@ -372,7 +375,7 @@ def pipeline_monitoring(renames):
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(WordCountOptions)
- p = beam.Pipeline(options=pipeline_options)
+ p = TestPipeline() # Use TestPipeline for testing.
# [START pipeline_monitoring_execution]
(p
@@ -405,7 +408,7 @@ def examples_wordcount_minimal(renames):
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
- options.view_as(StandardOptions).runner = 'BlockingDataflowRunner'
+ options.view_as(StandardOptions).runner = 'DataflowRunner'
# [END examples_wordcount_minimal_options]
# Run it locally for testing.
@@ -441,8 +444,9 @@ def examples_wordcount_minimal(renames):
p.visit(SnippetUtils.RenameFiles(renames))
# [START examples_wordcount_minimal_run]
- p.run()
+ result = p.run()
# [END examples_wordcount_minimal_run]
+ result.wait_until_finish()
def examples_wordcount_wordcount(renames):
@@ -497,7 +501,7 @@ def examples_wordcount_wordcount(renames):
formatted | beam.io.WriteToText('gs://my-bucket/counts.txt')
p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
+ p.run().wait_until_finish()
def examples_wordcount_debugging(renames):
@@ -505,7 +509,6 @@ def examples_wordcount_debugging(renames):
import re
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
# [START example_wordcount_debugging_logging]
# [START example_wordcount_debugging_aggregators]
@@ -546,7 +549,7 @@ def examples_wordcount_debugging(renames):
# [END example_wordcount_debugging_logging]
# [END example_wordcount_debugging_aggregators]
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
filtered_words = (
p
| beam.io.ReadFromText(
@@ -649,7 +652,7 @@ def model_custom_source(count):
lines, beam.equal_to(
['line ' + str(number) for number in range(0, count)]))
- p.run()
+ p.run().wait_until_finish()
# We recommend users to start Source classes with an underscore to discourage
# using the Source class directly when a PTransform for the source is
@@ -679,7 +682,7 @@ def model_custom_source(count):
lines, beam.equal_to(
['line ' + str(number) for number in range(0, count)]))
- p.run()
+ p.run().wait_until_finish()
def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
@@ -781,7 +784,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
final_table_name))
# [END model_custom_sink_use_new_sink]
- p.run()
+ p.run().wait_until_finish()
# We recommend users to start Sink class names with an underscore to
# discourage using the Sink class directly when a PTransform for the sink is
@@ -812,7 +815,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
'http://url_to_simple_kv/', final_table_name)
# [END model_custom_sink_use_ptransform]
- p.run()
+ p.run().wait_until_finish()
def model_textio(renames):
@@ -841,7 +844,7 @@ def model_textio(renames):
# [END model_textio_write]
p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
+ p.run().wait_until_finish()
def model_datastoreio():
@@ -954,8 +957,7 @@ def model_composite_transform_example(contents, output_path):
# [END composite_ptransform_apply_method]
# [END composite_transform_example]
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
(p
| beam.Create(contents)
| CountWords()
@@ -967,8 +969,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
"""Merging a PCollection with Flatten."""
some_hash_fn = lambda s: ord(s[0])
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
# Partition into deciles
@@ -1005,8 +1006,7 @@ def model_multiple_pcollections_partition(contents, output_path):
"""Assume i in [0,100)."""
return i
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
students = p | beam.Create(contents)
@@ -1032,8 +1032,7 @@ def model_group_by_key(contents, output_path):
import re
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
words_and_counts = (
p
| beam.Create(contents)
@@ -1056,8 +1055,7 @@ def model_group_by_key(contents, output_path):
def model_co_group_by_key_tuple(email_list, phone_list, output_path):
"""Applying a CoGroupByKey Transform to a tuple."""
import apache_beam as beam
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
# [START model_group_by_key_cogroupbykey_tuple]
# Each data set is represented by key-value pairs in separate PCollections.
# Both data sets share a common key type (in this example str).
@@ -1094,9 +1092,8 @@ def model_join_using_side_inputs(
import apache_beam as beam
from apache_beam.pvalue import AsIter
- from apache_beam.utils.pipeline_options import PipelineOptions
- p = beam.Pipeline(options=PipelineOptions())
+ p = TestPipeline() # Use TestPipeline for testing.
# [START model_join_using_side_inputs]
# This code performs a join by receiving the set of names as an input and
# passing PCollections that contain emails and phone numbers as side inputs
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index ffe0f58..34863f1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -20,7 +20,6 @@
import glob
import logging
import os
-import sys
import tempfile
import unittest
import uuid
@@ -35,6 +34,7 @@ from apache_beam.utils.pipeline_options import TypeOptions
from apache_beam.examples.snippets import snippets
# pylint: disable=expression-not-assigned
+from apache_beam.test_pipeline import TestPipeline
class ParDoTest(unittest.TestCase):
@@ -110,7 +110,7 @@ class ParDoTest(unittest.TestCase):
self.assertEqual({1, 2, 4}, set(result))
def test_pardo_side_input(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
# [START model_pardo_side_input]
@@ -228,7 +228,7 @@ class ParDoTest(unittest.TestCase):
class TypeHintsTest(unittest.TestCase):
def test_bad_types(self):
- p = beam.Pipeline('DirectRunner', argv=sys.argv)
+ p = TestPipeline()
evens = None # pylint: disable=unused-variable
# [START type_hints_missing_define_numbers]
@@ -290,7 +290,7 @@ class TypeHintsTest(unittest.TestCase):
def test_runtime_checks_off(self):
# pylint: disable=expression-not-assigned
- p = beam.Pipeline('DirectRunner', argv=sys.argv)
+ p = TestPipeline()
# [START type_hints_runtime_off]
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
p.run()
@@ -298,7 +298,7 @@ class TypeHintsTest(unittest.TestCase):
def test_runtime_checks_on(self):
# pylint: disable=expression-not-assigned
- p = beam.Pipeline('DirectRunner', argv=sys.argv)
+ p = TestPipeline()
with self.assertRaises(typehints.TypeCheckError):
# [START type_hints_runtime_on]
p.options.view_as(TypeOptions).runtime_type_check = True
@@ -307,7 +307,7 @@ class TypeHintsTest(unittest.TestCase):
# [END type_hints_runtime_on]
def test_deterministic_key(self):
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
lines = (p | beam.Create(
['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index d25ec3e..d0cc8a2 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -56,7 +56,7 @@ def run(argv=None):
transformed | beam.io.Write(
beam.io.PubSubSink(known_args.output_topic))
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 35c1abb..adfc33d 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -66,7 +66,7 @@ def run(argv=None):
transformed | beam.io.Write(
'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 211211d..92929af 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -97,6 +97,7 @@ def run(argv=None):
# Actually run the pipeline (all operations above are deferred).
result = p.run()
+ result.wait_until_finish()
empty_line_values = result.aggregated_values(empty_line_aggregator)
logging.info('number of empty lines: %d', sum(empty_line_values.values()))
word_length_values = result.aggregated_values(average_word_size_aggregator)
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 20d1c2f..ac13f35 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -32,7 +32,7 @@ pipeline configuration::
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMP_DIRECTORY
--job_name YOUR_JOB_NAME
- --runner BlockingDataflowRunner
+ --runner DataflowRunner
and an output prefix on GCS::
@@ -151,7 +151,7 @@ def run(argv=None):
| 'write' >> WriteToText(known_args.output))
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 18595d0..b80ed84 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -73,7 +73,7 @@ def run(argv=None):
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
- # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowRunner to
+ # CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
# run your pipeline on the Google Cloud Dataflow Service.
'--runner=DirectRunner',
# CHANGE 3/5: Your project ID is required in order to run your pipeline on
@@ -113,7 +113,7 @@ def run(argv=None):
output | 'write' >> WriteToText(known_args.output)
# Actually run the pipeline (all operations above are deferred).
- p.run()
+ p.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 3ff2529..2cc4684 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -26,6 +26,7 @@ from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io import source_test_utils
from apache_beam.io.concat_source import ConcatSource
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -212,7 +213,7 @@ class ConcatSourceTest(unittest.TestCase):
RangeSource(10, 100),
RangeSource(100, 1000),
])
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Read(source)
assert_that(pcoll, equal_to(range(1000)))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 8f12627..7481c4c 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -38,6 +38,7 @@ from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
from apache_beam.io.filebasedsource import FileBasedSource
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.util import assert_that
@@ -364,7 +365,7 @@ class TestFileBasedSource(unittest.TestCase):
self.assertItemsEqual(expected_data, read_data)
def _run_source_test(self, pattern, expected_data, splittable=True):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
pattern, splittable=splittable))
assert_that(pcoll, equal_to(expected_data))
@@ -404,7 +405,7 @@ class TestFileBasedSource(unittest.TestCase):
with bz2.BZ2File(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
@@ -419,7 +420,7 @@ class TestFileBasedSource(unittest.TestCase):
with gzip.GzipFile(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
@@ -437,7 +438,7 @@ class TestFileBasedSource(unittest.TestCase):
compressed_chunks.append(
compressobj.compress('\n'.join(c)) + compressobj.flush())
file_pattern = write_prepared_pattern(compressed_chunks)
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
splittable=False,
@@ -456,7 +457,7 @@ class TestFileBasedSource(unittest.TestCase):
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(compressed_chunks)
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
splittable=False,
@@ -471,7 +472,7 @@ class TestFileBasedSource(unittest.TestCase):
with bz2.BZ2File(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
compression_type=fileio.CompressionTypes.AUTO))
@@ -485,7 +486,7 @@ class TestFileBasedSource(unittest.TestCase):
with gzip.GzipFile(filename, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
compression_type=fileio.CompressionTypes.AUTO))
@@ -504,7 +505,7 @@ class TestFileBasedSource(unittest.TestCase):
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(
compressed_chunks, suffixes=['.gz']*len(chunks))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
compression_type=fileio.CompressionTypes.AUTO))
@@ -526,7 +527,7 @@ class TestFileBasedSource(unittest.TestCase):
chunks_to_write.append('\n'.join(c))
file_pattern = write_prepared_pattern(chunks_to_write,
suffixes=(['.gz', '']*3))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
compression_type=fileio.CompressionTypes.AUTO))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 68e2bce..8842369 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -34,6 +34,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import fileio
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -760,7 +761,7 @@ class TestNativeTextFileSink(unittest.TestCase):
self.assertEqual(f.read().splitlines(), [])
def test_write_native(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned
pipeline.run()
@@ -773,7 +774,7 @@ class TestNativeTextFileSink(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_native_auto_compression(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
fileio.NativeTextFileSink(
@@ -788,7 +789,7 @@ class TestNativeTextFileSink(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_native_auto_compression_unsharded(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
fileio.NativeTextFileSink(
@@ -880,7 +881,7 @@ class TestFileSink(unittest.TestCase):
temp_path = tempfile.NamedTemporaryFile().name
sink = MyFileSink(
temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
p.run()
self.assertEqual(
@@ -894,7 +895,7 @@ class TestFileSink(unittest.TestCase):
num_shards=3,
shard_name_template='_NN_SSS_',
coder=coders.ToStringCoder())
- p = beam.Pipeline('DirectRunner')
+ p = TestPipeline()
p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index f0f2046..dc0fd54 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -27,6 +27,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import iobase
from apache_beam.io import range_trackers
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -98,7 +99,7 @@ class SourcesTest(unittest.TestCase):
def test_run_direct(self):
file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.Read(LineSource(file_name))
assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd']))
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 877e190..07ab4cc 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -43,6 +43,8 @@ from apache_beam.io.filebasedsource_test import write_data
from apache_beam.io.filebasedsource_test import write_pattern
from apache_beam.io.fileio import CompressionTypes
+from apache_beam.test_pipeline import TestPipeline
+
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -304,7 +306,7 @@ class TextSourceTest(unittest.TestCase):
def test_dataflow_single_file(self):
file_name, expected_data = write_data(5)
assert len(expected_data) == 5
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name)
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
@@ -319,7 +321,7 @@ class TextSourceTest(unittest.TestCase):
file_name, expected_data = write_data(5)
assert len(expected_data) == 5
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name, coder=DummyCoder())
assert_that(pcoll, equal_to([record * 2 for record in expected_data]))
pipeline.run()
@@ -327,7 +329,7 @@ class TextSourceTest(unittest.TestCase):
def test_dataflow_file_pattern(self):
pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4])
assert len(expected_data) == 40
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(pattern)
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
@@ -339,7 +341,7 @@ class TextSourceTest(unittest.TestCase):
with bz2.BZ2File(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name)
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -351,7 +353,7 @@ class TextSourceTest(unittest.TestCase):
with gzip.GzipFile(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(file_name)
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -363,7 +365,7 @@ class TextSourceTest(unittest.TestCase):
with bz2.BZ2File(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
file_name,
compression_type=CompressionTypes.BZIP2)
@@ -377,7 +379,7 @@ class TextSourceTest(unittest.TestCase):
with gzip.GzipFile(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
file_name,
0, CompressionTypes.GZIP,
@@ -392,7 +394,7 @@ class TextSourceTest(unittest.TestCase):
with gzip.GzipFile(file_name, 'wb') as f:
f.write('\n'.join(lines))
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
file_name,
0, CompressionTypes.GZIP,
@@ -425,7 +427,7 @@ class TextSourceTest(unittest.TestCase):
def test_read_gzip_empty_file(self):
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template).name
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromText(
filename,
0, CompressionTypes.GZIP,
@@ -521,7 +523,7 @@ class TextSinkTest(unittest.TestCase):
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_write_dataflow(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned
pipeline.run()
@@ -534,7 +536,7 @@ class TextSinkTest(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_dataflow_auto_compression(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned
pipeline.run()
@@ -547,7 +549,7 @@ class TextSinkTest(unittest.TestCase):
self.assertEqual(read_result, self.lines)
def test_write_dataflow_auto_compression_unsharded(self):
- pipeline = beam.Pipeline('DirectRunner')
+ pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create('Create', self.lines)
pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned
pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 0110273..fa78d6f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -25,6 +25,7 @@ select = E3
[testenv:py27]
deps=
+ nose
pep8
pylint
commands =
[5/6] beam git commit: Add dependency comments to tox file.
Posted by ro...@apache.org.
Add dependency comments to tox file.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2197009d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2197009d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2197009d
Branch: refs/heads/python-sdk
Commit: 2197009d4048fb0b76454c479171d1ffd2d57844
Parents: 4ded918
Author: Ahmet Altay <al...@google.com>
Authored: Tue Jan 17 15:41:45 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 10:01:41 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/examples/snippets/snippets.py | 3 +--
sdks/python/tox.ini | 2 ++
2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2197009d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 0cba1af..ff240eb 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -109,9 +109,8 @@ def construct_pipeline(renames):
p.visit(SnippetUtils.RenameFiles(renames))
# [START pipelines_constructing_running]
- result = p.run()
+ p.run()
# [END pipelines_constructing_running]
- result
def model_pipelines(argv):
http://git-wip-us.apache.org/repos/asf/beam/blob/2197009d/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index fa78d6f..20afa58 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -24,6 +24,8 @@ envlist = py27
select = E3
[testenv:py27]
+# autocomplete_test depends on nose when invoked directly.
+# run_pylint.sh depends on pep8 and pylint.
deps=
nose
pep8
[6/6] beam git commit: Closes #1762
Posted by ro...@apache.org.
Closes #1762
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/36a7d349
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/36a7d349
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/36a7d349
Branch: refs/heads/python-sdk
Commit: 36a7d3491b4929f7539f2aa620b4ad25865036f6
Parents: f25c0e4 2197009
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Jan 18 10:10:50 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 10:10:50 2017 -0800
----------------------------------------------------------------------
.../examples/complete/autocomplete_test.py | 3 +-
.../examples/complete/estimate_pi_test.py | 4 +-
.../complete/juliaset/juliaset/juliaset.py | 2 +-
.../complete/juliaset/juliaset/juliaset_test.py | 2 +-
.../examples/complete/juliaset/juliaset_main.py | 2 +-
.../apache_beam/examples/complete/tfidf.py | 3 +-
.../apache_beam/examples/complete/tfidf_test.py | 3 +-
.../examples/complete/top_wikipedia_sessions.py | 2 +-
.../complete/top_wikipedia_sessions_test.py | 3 +-
.../cookbook/bigquery_side_input_test.py | 3 +-
.../cookbook/bigquery_tornadoes_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 2 +-
.../examples/cookbook/bigshuffle_test.py | 2 +-
.../examples/cookbook/coders_test.py | 3 +-
.../examples/cookbook/combiners_test.py | 5 +-
.../examples/cookbook/custom_ptransform.py | 4 +-
.../examples/cookbook/custom_ptransform_test.py | 3 +-
.../examples/cookbook/datastore_wordcount.py | 7 +-
.../examples/cookbook/filters_test.py | 3 +-
.../examples/cookbook/group_with_coder.py | 2 +-
.../examples/cookbook/group_with_coder_test.py | 4 +-
.../examples/cookbook/mergecontacts.py | 2 +-
.../examples/cookbook/mergecontacts_test.py | 3 +-
.../examples/cookbook/multiple_output_pardo.py | 4 +-
.../cookbook/multiple_output_pardo_test.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 62 ++++++-------
.../examples/snippets/snippets_test.py | 12 +--
.../apache_beam/examples/streaming_wordcap.py | 2 +-
.../apache_beam/examples/streaming_wordcount.py | 2 +-
sdks/python/apache_beam/examples/wordcount.py | 1 +
.../apache_beam/examples/wordcount_debugging.py | 4 +-
.../apache_beam/examples/wordcount_minimal.py | 4 +-
.../python/apache_beam/io/concat_source_test.py | 3 +-
.../apache_beam/io/filebasedsource_test.py | 19 ++--
sdks/python/apache_beam/io/fileio_test.py | 11 +--
sdks/python/apache_beam/io/sources_test.py | 3 +-
sdks/python/apache_beam/io/textio_test.py | 26 +++---
sdks/python/apache_beam/pipeline.py | 2 +-
sdks/python/apache_beam/pipeline_test.py | 13 +--
.../apache_beam/runners/dataflow_runner.py | 60 +++++++++----
.../apache_beam/runners/direct/direct_runner.py | 15 ++--
sdks/python/apache_beam/runners/runner.py | 35 +++++++-
sdks/python/apache_beam/runners/runner_test.py | 1 +
.../apache_beam/runners/template_runner_test.py | 4 +-
.../runners/test/test_dataflow_runner.py | 4 +-
sdks/python/apache_beam/test_pipeline.py | 11 ++-
.../apache_beam/tests/pipeline_verifiers.py | 4 +-
.../apache_beam/transforms/aggregator_test.py | 3 +-
.../apache_beam/transforms/combiners_test.py | 26 +++---
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 93 ++++++++++----------
.../apache_beam/transforms/window_test.py | 10 +--
.../transforms/write_ptransform_test.py | 5 +-
.../typehints/typed_pipeline_test.py | 6 +-
.../apache_beam/utils/pipeline_options.py | 3 +-
sdks/python/run_postcommit.sh | 2 +-
sdks/python/tox.ini | 3 +
57 files changed, 304 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
[2/6] beam git commit: Implement wait_until_finish method for
existing runners.
Posted by ro...@apache.org.
Implement wait_until_finish method for existing runners.
Also defines the not implemented cancel() method and updates existing
usages to use wait_until_finish() instead of blocking runners.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74dda50e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74dda50e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74dda50e
Branch: refs/heads/python-sdk
Commit: 74dda50e64a93ab3c147ac24f7436ef04467aa27
Parents: f25c0e4
Author: Ahmet Altay <al...@google.com>
Authored: Mon Jan 9 18:23:20 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:35 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 2 +-
.../apache_beam/runners/dataflow_runner.py | 60 +++++++++++++-------
.../apache_beam/runners/direct/direct_runner.py | 15 ++---
sdks/python/apache_beam/runners/runner.py | 35 +++++++++++-
sdks/python/apache_beam/runners/runner_test.py | 1 +
.../apache_beam/runners/template_runner_test.py | 4 +-
.../runners/test/test_dataflow_runner.py | 4 +-
.../apache_beam/utils/pipeline_options.py | 3 +-
sdks/python/run_postcommit.sh | 2 +-
9 files changed, 90 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 6517960..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -167,7 +167,7 @@ class Pipeline(object):
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
- self.run()
+ self.run().wait_until_finish()
def visit(self, visitor):
"""Visits depth-first every node of a pipeline's DAG.
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 3505acc..330472b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,7 @@ class DataflowRunner(PipelineRunner):
if not page_token:
break
- runner.result = DataflowPipelineResult(response)
+ runner.result = DataflowPipelineResult(response, runner)
runner.last_error_msg = last_error_msg
def run(self, pipeline):
@@ -176,23 +176,11 @@ class DataflowRunner(PipelineRunner):
# Create the job
self.result = DataflowPipelineResult(
- self.dataflow_client.create_job(self.job))
+ self.dataflow_client.create_job(self.job), self)
if self.result.has_job and self.blocking:
- thread = threading.Thread(
- target=DataflowRunner.poll_for_job_completion,
- args=(self, self.result.job_id()))
- # Mark the thread as a daemon thread so a keyboard interrupt on the main
- # thread will terminate everything. This is also the reason we will not
- # use thread.join() to wait for the polling thread.
- thread.daemon = True
- thread.start()
- while thread.isAlive():
- time.sleep(5.0)
- if self.result.current_state() != PipelineState.DONE:
- raise DataflowRuntimeException(
- 'Dataflow pipeline failed:\n%s'
- % getattr(self, 'last_error_msg', None), self.result)
+ self.result.wait_until_finish()
+
return self.result
def _get_typehint_based_encoding(self, typehint, window_coder):
@@ -651,9 +639,10 @@ class DataflowRunner(PipelineRunner):
class DataflowPipelineResult(PipelineResult):
"""Represents the state of a pipeline run on the Dataflow service."""
- def __init__(self, job):
+ def __init__(self, job, runner):
"""Job is a Job message from the Dataflow API."""
self._job = job
+ self._runner = runner
def job_id(self):
return self._job.id
@@ -662,12 +651,16 @@ class DataflowPipelineResult(PipelineResult):
def has_job(self):
return self._job is not None
- def current_state(self):
+ @property
+ def state(self):
"""Return the current state of the remote job.
Returns:
A PipelineState object.
"""
+ if not self.has_job:
+ return PipelineState.UNKNOWN
+
values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
api_jobstate_map = {
values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
@@ -684,11 +677,40 @@ class DataflowPipelineResult(PipelineResult):
return (api_jobstate_map[self._job.currentState] if self._job.currentState
else PipelineState.UNKNOWN)
+ def _is_in_terminal_state(self):
+ if not self.has_job:
+ return True
+
+ return self.state in [
+ PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED,
+ PipelineState.CANCELLED, PipelineState.DRAINED]
+
+ def wait_until_finish(self, duration=None):
+ if not self._is_in_terminal_state():
+ if not self.has_job:
+ raise IOError('Failed to get the Dataflow job id.')
+ if duration:
+ raise NotImplementedError(
+ 'DataflowRunner does not support duration argument.')
+
+ thread = threading.Thread(
+ target=DataflowRunner.poll_for_job_completion,
+ args=(self._runner, self.job_id()))
+
+ # Mark the thread as a daemon thread so a keyboard interrupt on the main
+ # thread will terminate everything. This is also the reason we will not
+ # use thread.join() to wait for the polling thread.
+ thread.daemon = True
+ thread.start()
+ while thread.isAlive():
+ time.sleep(5.0)
+ return self.state
+
def __str__(self):
return '<%s %s %s>' % (
self.__class__.__name__,
self.job_id(),
- self.current_state())
+ self.state)
def __repr__(self):
return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index a5c616b..dc2668d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -43,7 +43,7 @@ class DirectRunner(PipelineRunner):
def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
- # TODO: Move imports to top. Pipeline <-> Runner dependecy cause problems
+ # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import \
@@ -76,12 +76,10 @@ class DirectRunner(PipelineRunner):
executor.start(self.visitor.root_transforms)
result = DirectPipelineResult(executor, evaluation_context)
- # TODO(altay): If blocking:
- # Block until the pipeline completes. This call will return after the
- # pipeline was fully terminated (successfully or with a failure).
- result.await_completion()
-
if self._cache:
+ # We are running in eager mode, block until the pipeline execution
+ # completes in order to have full results in the cache.
+ result.wait_until_finish()
self._cache.finalize()
return result
@@ -141,8 +139,11 @@ class DirectPipelineResult(PipelineResult):
def _is_in_terminal_state(self):
return self._state is not PipelineState.RUNNING
- def await_completion(self):
+ def wait_until_finish(self, duration=None):
if not self._is_in_terminal_state():
+ if duration:
+ raise NotImplementedError(
+ 'DirectRunner does not support duration argument.')
try:
self._executor.await_completion()
self._state = PipelineState.DONE
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 3dc4d28..1a50df4 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -287,7 +287,7 @@ class PValueCache(object):
class PipelineState(object):
- """State of the Pipeline, as returned by PipelineResult.current_state().
+ """State of the Pipeline, as returned by PipelineResult.state.
This is meant to be the union of all the states any runner can put a
pipeline in. Currently, it represents the values of the dataflow
@@ -310,10 +310,39 @@ class PipelineResult(object):
def __init__(self, state):
self._state = state
- def current_state(self):
- """Return the current state of running the pipeline."""
+ @property
+ def state(self):
+ """Return the current state of the pipeline execution."""
return self._state
+ def wait_until_finish(self, duration=None):
+ """Waits until the pipeline finishes and returns the final status.
+
+ Args:
+ duration: The time to wait (in milliseconds) for job to finish. If it is
+ set to None, it will wait indefinitely until the job is finished.
+
+ Raises:
+ IOError: If there is a persistent problem getting job information.
+ NotImplementedError: If the runner does not support this operation.
+
+ Returns:
+ The final state of the pipeline, or None on timeout.
+ """
+ raise NotImplementedError
+
+ def cancel(self):
+ """Cancels the pipeline execution.
+
+ Raises:
+ IOError: If there is a persistent problem getting job information.
+ NotImplementedError: If the runner does not support this operation.
+
+ Returns:
+ The final state of the pipeline.
+ """
+ raise NotImplementedError
+
# pylint: disable=unused-argument
def aggregated_values(self, aggregator_or_name):
"""Return a dict of step names to values of the Aggregator."""
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index ea86061..2b6c316 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -161,6 +161,7 @@ class RunnerTest(unittest.TestCase):
(p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
| 'do' >> beam.ParDo(MyDoFn()))
result = p.run()
+ result.wait_until_finish()
metrics = result.metrics().query()
namespace = '{}.{}'.format(MyDoFn.__module__,
MyDoFn.__name__)
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py
index 457022d..af7f2c6 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -55,7 +55,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
'--no_auth=True']))
pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
- pipeline.run()
+ pipeline.run().wait_until_finish()
with open(dummy_file_name) as template_file:
saved_job_dict = json.load(template_file)
self.assertEqual(
@@ -81,7 +81,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
remote_runner.job = apiclient.Job(pipeline.options)
with self.assertRaises(IOError):
- pipeline.run()
+ pipeline.run().wait_until_finish()
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
index 77655bd..823e534 100644
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -25,14 +25,16 @@ from apache_beam.utils.pipeline_options import TestOptions
class TestDataflowRunner(DataflowRunner):
def __init__(self):
- super(TestDataflowRunner, self).__init__(blocking=True)
+ super(TestDataflowRunner, self).__init__()
def run(self, pipeline):
"""Execute test pipeline and verify test matcher"""
self.result = super(TestDataflowRunner, self).run(pipeline)
+ self.result.wait_until_finish()
options = pipeline.options.view_as(TestOptions)
if options.on_success_matcher:
from hamcrest import assert_that as hc_assert_that
hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
+
return self.result
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index 9f57ee7..16b1640 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -182,8 +182,7 @@ class StandardOptions(PipelineOptions):
parser.add_argument(
'--runner',
help=('Pipeline runner used to execute the workflow. Valid values are '
- 'DirectRunner, DataflowRunner, '
- 'and BlockingDataflowRunner.'))
+ 'DirectRunner, DataflowRunner.'))
# Whether to enable streaming mode.
parser.add_argument('--streaming',
default=False,
http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/run_postcommit.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 67a257e..2e419a5 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -74,7 +74,7 @@ SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz)
echo ">>> RUNNING DATAFLOW RUNNER VALIDATESRUNNER TESTS"
python setup.py nosetests \
-a ValidatesRunner --test-pipeline-options=" \
- --runner=BlockingDataflowRunner \
+ --runner=TestDataflowRunner \
--project=$PROJECT \
--staging_location=$GCS_LOCATION/staging-validatesrunner-test \
--temp_location=$GCS_LOCATION/temp-validatesrunner-test \