You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 18:11:33 UTC

[1/6] beam git commit: Make TestPipeline.run() blocking by default.

Repository: beam
Updated Branches:
  refs/heads/python-sdk f25c0e434 -> 36a7d3491


Make TestPipeline.run() blocking by default.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e49f518
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e49f518
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e49f518

Branch: refs/heads/python-sdk
Commit: 2e49f518bcb2f0ab16e4f17f75f85eb28534a1b4
Parents: 74dda50
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 13 14:09:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:35 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/test_pipeline.py | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2e49f518/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 69f4ddd..c29a879 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -58,7 +58,8 @@ class TestPipeline(Pipeline):
                runner=None,
                options=None,
                argv=None,
-               is_integration_test=False):
+               is_integration_test=False,
+               blocking=True):
     """Initialize a pipeline object for test.
 
     Args:
@@ -72,6 +73,7 @@ class TestPipeline(Pipeline):
         is None.
       is_integration_test: True if the test is an integration test, False
         otherwise.
+      blocking: Run method will wait until pipeline execution is completed.
 
     Raises:
       ValueError: if either the runner or options argument is not of the
@@ -79,10 +81,17 @@ class TestPipeline(Pipeline):
     """
     self.is_integration_test = is_integration_test
     self.options_list = self._parse_test_option_args(argv)
+    self.blocking = blocking
     if options is None:
       options = PipelineOptions(self.options_list)
     super(TestPipeline, self).__init__(runner, options)
 
+  def run(self):
+    result = super(TestPipeline, self).run()
+    if self.blocking:
+      result.wait_until_finish()
+    return result
+
   def _parse_test_option_args(self, argv):
     """Parse value of command line argument: --test-pipeline-options to get
     pipeline options.


[4/6] beam git commit: Update tests to use TestPipeline()

Posted by ro...@apache.org.
Update tests to use TestPipeline()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ded9185
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ded9185
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ded9185

Branch: refs/heads/python-sdk
Commit: 4ded9185c063eae9f54ce457a1fa753679d1ce82
Parents: 703c1bc
Author: Ahmet Altay <al...@google.com>
Authored: Sun Jan 15 00:35:49 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 10:01:41 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline_test.py        | 13 +--
 .../apache_beam/tests/pipeline_verifiers.py     |  4 +-
 .../apache_beam/transforms/aggregator_test.py   |  3 +-
 .../apache_beam/transforms/combiners_test.py    | 26 +++---
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   | 93 ++++++++++----------
 .../apache_beam/transforms/window_test.py       | 10 +--
 .../transforms/write_ptransform_test.py         |  5 +-
 .../typehints/typed_pipeline_test.py            |  6 +-
 9 files changed, 81 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index d6925d4..336bf54 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -25,6 +25,7 @@ from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
@@ -94,7 +95,7 @@ class PipelineTest(unittest.TestCase):
       self.leave_composite.append(transform_node)
 
   def test_create(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'label1' >> Create([1, 2, 3])
     assert_that(pcoll, equal_to([1, 2, 3]))
 
@@ -105,13 +106,13 @@ class PipelineTest(unittest.TestCase):
     pipeline.run()
 
   def test_create_singleton_pcollection(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
     assert_that(pcoll, equal_to([[1, 2, 3]]))
     pipeline.run()
 
   def test_read(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
     assert_that(pcoll, equal_to([1, 2, 3]))
     pipeline.run()
@@ -136,7 +137,7 @@ class PipelineTest(unittest.TestCase):
     self.assertEqual(visitor.leave_composite[0].transform, transform)
 
   def test_apply_custom_transform(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
     result = pcoll | PipelineTest.CustomTransform()
     assert_that(result, equal_to([2, 3, 4]))
@@ -158,7 +159,7 @@ class PipelineTest(unittest.TestCase):
         'pvalue | "label" >> transform')
 
   def test_reuse_cloned_custom_transform_instance(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
     pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6])
     transform = PipelineTest.CustomTransform()
@@ -207,7 +208,7 @@ class PipelineTest(unittest.TestCase):
     num_elements = 10
     num_maps = 100
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline(runner='DirectRunner')
 
     # Consumed memory should not be proportional to the number of maps.
     memory_threshold = (

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 6bf8d48..9b286a2 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -46,7 +46,7 @@ class PipelineStateMatcher(BaseMatcher):
     self.expected_state = expected_state
 
   def _matches(self, pipeline_result):
-    return pipeline_result.current_state() == self.expected_state
+    return pipeline_result.state == self.expected_state
 
   def describe_to(self, description):
     description \
@@ -56,7 +56,7 @@ class PipelineStateMatcher(BaseMatcher):
   def describe_mismatch(self, pipeline_result, mismatch_description):
     mismatch_description \
       .append_text("Test pipeline job terminated in state: ") \
-      .append_text(pipeline_result.current_state())
+      .append_text(pipeline_result.state)
 
 
 def retry_on_io_error_and_server_error(exception):

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py
index e77dfba..d493c46 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -22,6 +22,7 @@ import unittest
 import apache_beam as beam
 from apache_beam.transforms import combiners
 from apache_beam.transforms.aggregator import Aggregator
+from apache_beam.test_pipeline import TestPipeline
 
 
 class AggregatorTest(unittest.TestCase):
@@ -63,7 +64,7 @@ class AggregatorTest(unittest.TestCase):
         for a in aggregators:
           context.aggregate_to(a, context.element)
 
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators())  # pylint: disable=expression-not-assigned
     res = p.run()
     for (_, _, expected), a in zip(counter_types, aggregators):

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 72fce60..8a6d352 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -22,7 +22,7 @@ import unittest
 import hamcrest as hc
 
 import apache_beam as beam
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.core import CombineGlobally
 from apache_beam.transforms.core import Create
@@ -40,7 +40,7 @@ class CombineTest(unittest.TestCase):
     combine.TopCombineFn._MIN_BUFFER_OVERSIZE = 1
 
   def test_builtin_combines(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
 
     vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
     mean = sum(vals) / float(len(vals))
@@ -62,7 +62,7 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_top(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
 
     # A parameter we'll be sharing with a custom comparator.
     names = {0: 'zo',
@@ -201,7 +201,7 @@ class CombineTest(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_top_shorthands(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
 
     pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
     result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
@@ -222,7 +222,7 @@ class CombineTest(unittest.TestCase):
 
     # First test global samples (lots of them).
     for ix in xrange(300):
-      pipeline = Pipeline('DirectRunner')
+      pipeline = TestPipeline()
       pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
       result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
 
@@ -241,7 +241,7 @@ class CombineTest(unittest.TestCase):
       pipeline.run()
 
     # Now test per-key samples.
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start-perkey' >> Create(
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
     result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
@@ -258,7 +258,7 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_tuple_combine_fn(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (
         p
         | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
@@ -269,7 +269,7 @@ class CombineTest(unittest.TestCase):
     p.run()
 
   def test_tuple_combine_fn_without_defaults(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (
         p
         | Create([1, 1, 2, 3])
@@ -280,7 +280,7 @@ class CombineTest(unittest.TestCase):
     p.run()
 
   def test_to_list_and_to_dict(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
     pcoll = pipeline | 'start' >> Create(the_list)
     result = pcoll | 'to list' >> combine.ToList()
@@ -292,7 +292,7 @@ class CombineTest(unittest.TestCase):
     assert_that(result, matcher([the_list]))
     pipeline.run()
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pairs = [(1, 2), (3, 4), (5, 6)]
     pcoll = pipeline | 'start-pairs' >> Create(pairs)
     result = pcoll | 'to dict' >> combine.ToDict()
@@ -306,12 +306,12 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_combine_globally_with_default(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
     p.run()
 
   def test_combine_globally_without_default(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = p | Create([]) | CombineGlobally(sum).without_defaults()
     assert_that(result, equal_to([]))
     p.run()
@@ -323,7 +323,7 @@ class CombineTest(unittest.TestCase):
         main = pcoll.pipeline | Create([None])
         return main | Map(lambda _, s: s, side)
 
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
     result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
     assert_that(result1, equal_to([0]), label='r1')

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 006a937..b5ac64b 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -443,7 +443,7 @@ class PTransform(WithTypeHints, HasDisplayData):
       # Get a reference to the runners internal cache, otherwise runner may
       # clean it after run.
       cache = p.runner.cache
-      p.run()
+      p.run().wait_until_finish()
       return _MaterializePValues(cache).visit(result)
 
   def _extract_input_pvalues(self, pvalueish):

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 705e85e..58382e4 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -26,7 +26,7 @@ import unittest
 import hamcrest as hc
 
 import apache_beam as beam
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 import apache_beam.pvalue as pvalue
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
@@ -36,7 +36,6 @@ import apache_beam.typehints as typehints
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
 from apache_beam.typehints.typehints_test import TypeHintTestCase
-from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import TypeOptions
 
 
@@ -54,12 +53,12 @@ class PTransformTest(unittest.TestCase):
     self.assertEqual('<PTransform(PTransform) label=[PTransform]>',
                      str(PTransform()))
 
-    pa = Pipeline('DirectRunner')
+    pa = TestPipeline()
     res = pa | 'a_label' >> beam.Create([1, 2])
     self.assertEqual('AppliedPTransform(a_label, Create)',
                      str(res.producer))
 
-    pc = Pipeline('DirectRunner')
+    pc = TestPipeline()
     res = pc | beam.Create([1, 2])
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
@@ -67,7 +66,7 @@ class PTransformTest(unittest.TestCase):
         """<Create(PTransform) label=[Create] inputs=('ci',)>""",
         str(inputs_tr))
 
-    pd = Pipeline('DirectRunner')
+    pd = TestPipeline()
     res = pd | beam.Create([1, 2])
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
@@ -111,7 +110,7 @@ class PTransformTest(unittest.TestCase):
       def process(self, context, addon):
         return [context.element + addon]
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
     assert_that(result, equal_to([11, 12, 13]))
@@ -123,20 +122,20 @@ class PTransformTest(unittest.TestCase):
       def process(self, context):
         pass
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     with self.assertRaises(ValueError):
       pcoll | 'do' >> beam.ParDo(MyDoFn)  # Note the lack of ()'s
 
   def test_do_with_callable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_side_input_as_arg(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     side = pipeline | 'side' >> beam.Create([10])
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
@@ -145,7 +144,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_do_with_side_input_as_keyword_arg(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     side = pipeline | 'side' >> beam.Create([10])
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
@@ -154,7 +153,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_do_with_do_fn_returning_string_raises_warning(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
     pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
 
@@ -168,7 +167,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_with_do_fn_returning_dict_raises_warning(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
     pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
 
@@ -182,7 +181,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_with_side_outputs_maintains_unique_name(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
     r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
@@ -195,7 +194,7 @@ class PTransformTest(unittest.TestCase):
     # iterable.
     def incorrect_par_do_fn(x):
       return x + 5
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
     pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
     # It's a requirement that all user-defined functions to a ParDo return
@@ -216,7 +215,7 @@ class PTransformTest(unittest.TestCase):
 
       def finish_bundle(self, c):
         yield 'finish'
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.ParDo(MyDoFn())
 
@@ -231,7 +230,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_filter(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
     result = pcoll | beam.Filter(
         'filter', lambda x: x % 2 == 0)
@@ -257,7 +256,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_combine_fn(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
     assert_that(result, equal_to([sum(vals) / len(vals)]))
@@ -265,7 +264,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_callable(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | beam.CombineGlobally(sum)
     assert_that(result, equal_to([sum(vals)]))
@@ -273,7 +272,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_side_input_as_arg(self):
     values = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(values)
     divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombineGlobally(
@@ -288,7 +287,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_combine_fn(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
@@ -299,7 +298,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_callable(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
@@ -309,7 +308,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_side_input_as_arg(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     divisor = pipeline | 'divisor' >> beam.Create([2])
@@ -322,7 +321,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_group_by_key(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
     result = pcoll | 'group' >> beam.GroupByKey()
@@ -336,7 +335,7 @@ class PTransformTest(unittest.TestCase):
       def partition_for(self, context, num_partitions, offset):
         return (context.element % 3) + offset
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     # Attempt nominal partition operation.
     partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
@@ -348,14 +347,14 @@ class PTransformTest(unittest.TestCase):
 
     # Check that a bad partition label will yield an error. For the
     # DirectRunner, this error manifests as an exception.
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_partition_with_callable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = (
         pcoll | beam.Partition(
@@ -370,7 +369,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_partition_followed_by_flatten_and_groupbykey(self):
     """Regression test for an issue with how partitions are handled."""
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     contents = [('aa', 1), ('bb', 2), ('aa', 2)]
     created = pipeline | 'A' >> beam.Create(contents)
     partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
@@ -380,7 +379,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_pcollections(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
     pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
@@ -388,7 +387,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_no_pcollections(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     with self.assertRaises(ValueError):
       () | 'pipeline arg missing' >> beam.Flatten()
     result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
@@ -396,7 +395,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_pcollections_in_iterable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
     pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
@@ -417,7 +416,7 @@ class PTransformTest(unittest.TestCase):
       set([1, 2, 3]) | 'flatten' >> beam.Flatten()
 
   def test_co_group_by_key_on_list(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -429,7 +428,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_co_group_by_key_on_iterable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -442,7 +441,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_co_group_by_key_on_dict(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -454,7 +453,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_group_by_key_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5])
 
     with self.assertRaises(typehints.TypeCheckError) as e:
@@ -467,7 +466,7 @@ class PTransformTest(unittest.TestCase):
         'Tuple[TypeVariable[K], TypeVariable[V]]')
 
   def test_group_by_key_only_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
       pcolls | 'D' >> beam.GroupByKeyOnly()
@@ -478,7 +477,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_keys_and_values(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
     keys = pcoll.apply('keys', beam.Keys())
@@ -488,7 +487,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_kv_swap(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
     result = pcoll.apply('swap', beam.KvSwap())
@@ -496,7 +495,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_remove_duplicates(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
     result = pcoll.apply('nodupes', beam.RemoveDuplicates())
@@ -504,7 +503,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_chained_ptransforms(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     t = (beam.Map(lambda x: (x, 1))
          | beam.GroupByKey()
          | beam.Map(lambda (x, ones): (x, sum(ones))))
@@ -581,7 +580,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_chained_ptransforms(self):
     """Tests that chaining gets proper nesting."""
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     map1 = beam.Map('map1', lambda x: (x, 1))
     gbk = beam.GroupByKey('gbk')
     map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
@@ -594,7 +593,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_custom_transform_without_label(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform()
     result = pipeline.apply(custom, pcoll)
@@ -604,7 +603,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_custom_transform_with_label(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform('*custom*')
     result = pipeline.apply(custom, pcoll)
@@ -615,7 +614,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_combine_without_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally(sum)
     result = pcoll | combine
@@ -624,7 +623,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_ptransform_using_decorator(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     sample = SamplePTransform('*sample*')
     _ = pcoll | sample
@@ -635,7 +634,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_combine_with_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally('*sum*', sum)
     result = pcoll | combine
@@ -644,7 +643,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def check_label(self, ptransform, expected_label):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
     actual_label = sorted(pipeline.applied_labels - {'start'})[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
@@ -728,7 +727,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                     '"%s" does not start with "%s"' % (msg, prefix))
 
   def setUp(self):
-    self.p = Pipeline(options=PipelineOptions([]))
+    self.p = TestPipeline()
 
   def test_do_fn_pipeline_pipeline_type_check_satisfied(self):
     @with_input_types(int, int)

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 856d011..d4e8e25 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,7 +19,7 @@
 
 import unittest
 
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
 from apache_beam.transforms import core
@@ -140,7 +140,7 @@ class WindowTest(unittest.TestCase):
             | Map(lambda x: WindowedValue((key, x), x, [])))
 
   def test_sliding_windows(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
     result = (pcoll
               | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
@@ -153,7 +153,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_sessions(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
     result = (pcoll
               | 'w' >> WindowInto(Sessions(10))
@@ -166,7 +166,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_timestamped_value(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (p
               | 'start' >> Create([(k, k) for k in range(10)])
               | Map(lambda (x, t): TimestampedValue(x, t))
@@ -178,7 +178,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_timestamped_with_combiners(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (p
               # Create some initial test values.
               | 'start' >> Create([(k, k) for k in range(10)])

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index f96dffb..3d7fbd9 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -22,10 +22,9 @@ import unittest
 import apache_beam as beam
 
 from apache_beam.io import iobase
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import assert_that, is_empty
-from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class _TestSink(iobase.Sink):
@@ -99,7 +98,7 @@ class WriteTest(unittest.TestCase):
                       return_write_results=True):
     write_to_test_sink = WriteToTestSink(return_init_result,
                                          return_write_results)
-    p = Pipeline(options=PipelineOptions([]))
+    p = TestPipeline()
     result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
 
     assert_that(result, is_empty())

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 8b5e3f4..35987b7 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -23,10 +23,10 @@ import unittest
 import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam import typehints
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.typehints import WithTypeHints
 from apache_beam.utils.pipeline_options import OptionsContext
-from apache_beam.utils.pipeline_options import PipelineOptions
 
 # These test often construct a pipeline as value | PTransform to test side
 # effects (e.g. errors).
@@ -168,7 +168,7 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, int)
     def repeat(s, times):
       return s * times
-    p = beam.Pipeline(options=PipelineOptions([]))
+    p = TestPipeline()
     main_input = p | beam.Create(['a', 'bb', 'c'])
     side_input = p | 'side' >> beam.Create([3])
     result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
@@ -183,7 +183,7 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, typehints.Iterable[str])
     def concat(glue, items):
       return glue.join(sorted(items))
-    p = beam.Pipeline(options=PipelineOptions([]))
+    p = TestPipeline()
     main_input = p | beam.Create(['a', 'bb', 'c'])
     side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
     result = main_input | beam.Map(concat, pvalue.AsIter(side_input))


[3/6] beam git commit: Changed tests in examples/ and io/ to use TestPipeline.

Posted by ro...@apache.org.
Changed tests in examples/ and io/ to use TestPipeline.

Removed wait_until_finish except for a few exceptions:
- tfidf: as an example usage.
- some examples in cookbook - they run examples directly
  and, did not want to update the examples to use TestPipeline.
- some snippets - if the pipeline creations is part of the snippet
  and it was not easy to override.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/703c1bc1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/703c1bc1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/703c1bc1

Branch: refs/heads/python-sdk
Commit: 703c1bc18ac28aa1aa75a9359c1bcc4fbdd28f35
Parents: 2e49f51
Author: Ahmet Altay <al...@google.com>
Authored: Sat Jan 14 23:49:25 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:37 2017 -0800

----------------------------------------------------------------------
 .../examples/complete/autocomplete_test.py      |  3 +-
 .../examples/complete/estimate_pi_test.py       |  4 +-
 .../complete/juliaset/juliaset/juliaset.py      |  2 +-
 .../complete/juliaset/juliaset/juliaset_test.py |  2 +-
 .../examples/complete/juliaset/juliaset_main.py |  2 +-
 .../apache_beam/examples/complete/tfidf.py      |  3 +-
 .../apache_beam/examples/complete/tfidf_test.py |  3 +-
 .../examples/complete/top_wikipedia_sessions.py |  2 +-
 .../complete/top_wikipedia_sessions_test.py     |  3 +-
 .../cookbook/bigquery_side_input_test.py        |  3 +-
 .../cookbook/bigquery_tornadoes_test.py         |  2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |  2 +-
 .../examples/cookbook/bigshuffle_test.py        |  2 +-
 .../examples/cookbook/coders_test.py            |  3 +-
 .../examples/cookbook/combiners_test.py         |  5 +-
 .../examples/cookbook/custom_ptransform.py      |  4 +-
 .../examples/cookbook/custom_ptransform_test.py |  3 +-
 .../examples/cookbook/datastore_wordcount.py    |  7 ++-
 .../examples/cookbook/filters_test.py           |  3 +-
 .../examples/cookbook/group_with_coder.py       |  2 +-
 .../examples/cookbook/group_with_coder_test.py  |  4 +-
 .../examples/cookbook/mergecontacts.py          |  2 +-
 .../examples/cookbook/mergecontacts_test.py     |  3 +-
 .../examples/cookbook/multiple_output_pardo.py  |  4 +-
 .../cookbook/multiple_output_pardo_test.py      |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 65 ++++++++++----------
 .../examples/snippets/snippets_test.py          | 12 ++--
 .../apache_beam/examples/streaming_wordcap.py   |  2 +-
 .../apache_beam/examples/streaming_wordcount.py |  2 +-
 sdks/python/apache_beam/examples/wordcount.py   |  1 +
 .../apache_beam/examples/wordcount_debugging.py |  4 +-
 .../apache_beam/examples/wordcount_minimal.py   |  4 +-
 .../python/apache_beam/io/concat_source_test.py |  3 +-
 .../apache_beam/io/filebasedsource_test.py      | 19 +++---
 sdks/python/apache_beam/io/fileio_test.py       | 11 ++--
 sdks/python/apache_beam/io/sources_test.py      |  3 +-
 sdks/python/apache_beam/io/textio_test.py       | 26 ++++----
 sdks/python/tox.ini                             |  1 +
 38 files changed, 123 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 5ed4fb5..edf95f0 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,6 +21,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -30,7 +31,7 @@ class AutocompleteTest(unittest.TestCase):
   WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
 
   def test_top_prefixes(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     words = p | beam.Create(self.WORDS)
     result = words | autocomplete.TopPerPrefix(5)
     # values must be hashable for now

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 0440ecc..10010cb 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -20,8 +20,8 @@
 import logging
 import unittest
 
-import apache_beam as beam
 from apache_beam.examples.complete import estimate_pi
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import DataflowAssertException
 
@@ -38,7 +38,7 @@ def in_between(lower, upper):
 class EstimatePiTest(unittest.TestCase):
 
   def test_basics(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
 
     # Note: Probabilistically speaking this test can fail with a probability

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 45fc1fb..30883dc 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -113,7 +113,7 @@ def run(argv=None):  # pylint: disable=missing-docstring
        lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
    | WriteToText(known_args.coordinate_output))
   # pylint: enable=expression-not-assigned
-  p.run()
+  return p.run()
 
   # Optionally render the image and save it to a file.
   # TODO(silviuc): Add this functionality.

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
index c13e857..c254eb4 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
@@ -52,7 +52,7 @@ class JuliaSetTest(unittest.TestCase):
     if image_file_name is not None:
       args.append('--image_output=%s' % image_file_name)
 
-    juliaset.run(args)
+    juliaset.run(args).wait_until_finish()
 
   def test_output_file_format(self):
     grid_size = 5

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
index d6ba064..0db5431 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
@@ -38,7 +38,7 @@ an example:
 python juliaset_main.py \
   --job_name juliaset-$USER \
   --project YOUR-PROJECT \
-  --runner BlockingDataflowRunner \
+  --runner DataflowRunner \
   --setup_file ./setup.py \
   --staging_location gs://YOUR-BUCKET/juliaset/staging \
   --temp_location gs://YOUR-BUCKET/juliaset/temp \

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 59b9d6f..4d6e0d3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -200,7 +200,8 @@ def run(argv=None):
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
   output | 'write' >> WriteToText(known_args.output)
-  p.run()
+  # Execute the pipeline and wait until it is completed.
+  p.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index deda4cb..404ab44 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -25,6 +25,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.complete import tfidf
+from apache_beam.test_pipeline import TestPipeline
 
 
 EXPECTED_RESULTS = set([
@@ -47,7 +48,7 @@ class TfIdfTest(unittest.TestCase):
       f.write(contents)
 
   def test_tfidf_transform(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     uri_to_line = p | beam.Create(
         'create sample',
         [('1.txt', 'abc def ghi'),

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 2dea642..4920813 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -31,7 +31,7 @@ pipeline configuration in addition to the above:
   --project YOUR_PROJECT_ID
   --staging_location gs://YOUR_STAGING_DIRECTORY
   --temp_location gs://YOUR_TEMPORARY_DIRECTORY
-  --runner BlockingDataflowRunner
+  --runner DataflowRunner
 
 The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be
 overridden with --input.

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 1d25807..9b9d9b1 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -23,6 +23,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.complete import top_wikipedia_sessions
+from apache_beam.test_pipeline import TestPipeline
 
 
 class ComputeTopSessionsTest(unittest.TestCase):
@@ -49,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
   ]
 
   def test_compute_top_sessions(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     edits = p | beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 97c41d6..926f141 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -22,12 +22,13 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_side_input
+from apache_beam.test_pipeline import TestPipeline
 
 
 class BigQuerySideInputTest(unittest.TestCase):
 
   def test_create_groups(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
 
     group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
     corpus_pcoll = p | beam.Create('create_corpus',

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 2cb2c45..0fabe3f 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -36,7 +36,7 @@ class BigQueryTornadoesTest(unittest.TestCase):
     results = bigquery_tornadoes.count_tornadoes(rows)
     beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2},
                                              {'month': 2, 'tornado_count': 1}]))
-    p.run()
+    p.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index ceeefd6..b5eacce 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -87,7 +87,7 @@ def run(argv=None):
         known_args.checksum_output + '-output')
 
   # Actually run the pipeline (all operations above are deferred).
-  p.run()
+  return p.run()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
index d73c976..60b6acc 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py
@@ -38,7 +38,7 @@ class BigShuffleTest(unittest.TestCase):
     bigshuffle.run([
         '--input=%s*' % temp_path,
         '--output=%s.result' % temp_path,
-        '--checksum_output=%s.checksum' % temp_path])
+        '--checksum_output=%s.checksum' % temp_path]).wait_until_finish()
     # Parse result file and compare.
     results = []
     with open(temp_path + '.result-00000-of-00001') as result_file:

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index ec0848f..4a92abb 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -22,6 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import coders
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -34,7 +35,7 @@ class CodersTest(unittest.TestCase):
       {'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
 
   def test_compute_points(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
     result = (records
               | 'points' >> beam.FlatMap(coders.compute_points)

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 18bd3bc..a8ed555 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -27,6 +27,7 @@ import logging
 import unittest
 
 import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
 
 
 class CombinersTest(unittest.TestCase):
@@ -44,7 +45,7 @@ class CombinersTest(unittest.TestCase):
     can be used.
     """
     result = (
-        beam.Pipeline(runner=beam.runners.DirectRunner())
+        TestPipeline()
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(sum))
 
@@ -60,7 +61,7 @@ class CombinersTest(unittest.TestCase):
       return result
 
     result = (
-        beam.Pipeline(runner=beam.runners.DirectRunner())
+        TestPipeline()
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(multiply))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index cfbb99d..67d1ff8 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -51,7 +51,7 @@ def run_count1(known_args, options):
   (p | beam.io.ReadFromText(known_args.input)
    | Count1()
    | beam.io.WriteToText(known_args.output))
-  p.run()
+  p.run().wait_until_finish()
 
 
 @beam.ptransform_fn
@@ -70,7 +70,7 @@ def run_count2(known_args, options):
   (p | ReadFromText(known_args.input)
    | Count2()  # pylint: disable=no-value-for-parameter
    | WriteToText(known_args.output))
-  p.run()
+  p.run().wait_until_finish()
 
 
 @beam.ptransform_fn

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 91309ae..cd1c04a 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -22,6 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import custom_ptransform
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -39,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
     self.run_pipeline(custom_ptransform.Count3(factor), factor=factor)
 
   def run_pipeline(self, count_implementation, factor=1):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
     result = words | count_implementation
     assert_that(

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 8f68fb4..25abb3e 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -141,7 +141,7 @@ def write_to_datastore(project, user_options, pipeline_options):
    | 'write to datastore' >> WriteToDatastore(project))
 
   # Actually run the pipeline (all operations above are deferred).
-  p.run()
+  p.run().wait_until_finish()
 
 
 def make_ancestor_query(kind, namespace, ancestor):
@@ -192,7 +192,10 @@ def read_from_datastore(project, user_options, pipeline_options):
                                           num_shards=user_options.num_shards)
 
   # Actually run the pipeline (all operations above are deferred).
-  return p.run()
+  result = p.run()
+  # Wait until completion, main thread would access post-completion job results.
+  result.wait_until_finish()
+  return result
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 81dd30f..28bb1e1 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -22,6 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.examples.cookbook import filters
+from apache_beam.test_pipeline import TestPipeline
 
 
 class FiltersTest(unittest.TestCase):
@@ -35,7 +36,7 @@ class FiltersTest(unittest.TestCase):
       ]
 
   def _get_result_for_month(self, month):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     rows = (p | 'create' >> beam.Create(self.input_data))
 
     results = filters.filter_cold_days(rows, month)

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 78540d1..f6f2108 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -114,7 +114,7 @@ def run(argv=sys.argv[1:]):
    | beam.CombinePerKey(sum)
    | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
    | WriteToText(known_args.output))
-  p.run()
+  return p.run()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 268ba8d..4e87966 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -50,7 +50,7 @@ class GroupWithCoderTest(unittest.TestCase):
     temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
     group_with_coder.run([
         '--input=%s*' % temp_path,
-        '--output=%s.result' % temp_path])
+        '--output=%s.result' % temp_path]).wait_until_finish()
     # Parse result file and compare.
     results = []
     with open(temp_path + '.result-00000-of-00001') as result_file:
@@ -71,7 +71,7 @@ class GroupWithCoderTest(unittest.TestCase):
     group_with_coder.run([
         '--no_pipeline_type_check',
         '--input=%s*' % temp_path,
-        '--output=%s.result' % temp_path])
+        '--output=%s.result' % temp_path]).wait_until_finish()
     # Parse result file and compare.
     results = []
     with open(temp_path + '.result-00000-of-00001') as result_file:

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 2475e02..6906ae4 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -124,7 +124,7 @@ def run(argv=None, assert_results=None):
     beam.assert_that(num_nomads, beam.equal_to([expected_nomads]),
                      label='assert:nomads')
   # Execute pipeline.
-  p.run()
+  return p.run()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
index b3be0dd..09f71d3 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -107,12 +107,13 @@ class MergeContactsTest(unittest.TestCase):
 
     result_prefix = self.create_temp_file('')
 
-    mergecontacts.run([
+    result = mergecontacts.run([
         '--input_email=%s' % path_email,
         '--input_phone=%s' % path_phone,
         '--input_snailmail=%s' % path_snailmail,
         '--output_tsv=%s.tsv' % result_prefix,
         '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))
+    result.wait_until_finish()
 
     with open('%s.tsv-00000-of-00001' % result_prefix) as f:
       contents = f.read()

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 3acebc6..a877a1d 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -40,7 +40,7 @@ pipeline configuration:
   --staging_location gs://YOUR_STAGING_DIRECTORY
   --temp_location gs://YOUR_TEMP_DIRECTORY
   --job_name YOUR_JOB_NAME
-  --runner BlockingDataflowRunner
+  --runner DataflowRunner
 
 and an output prefix on GCS:
   --output gs://YOUR_OUTPUT_PREFIX
@@ -173,7 +173,7 @@ def run(argv=None):
    | 'count words' >> CountWords()
    | 'write words' >> WriteToText(known_args.output + '-words'))
 
-  p.run()
+  return p.run()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
index 3ddd668..2c9111c 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -52,7 +52,7 @@ class MultipleOutputParDo(unittest.TestCase):
 
     multiple_output_pardo.run([
         '--input=%s*' % temp_path,
-        '--output=%s' % result_prefix])
+        '--output=%s' % result_prefix]).wait_until_finish()
 
     expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n')))
     with open(result_prefix + '-chars-00000-of-00001') as f:

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 2eadc44..0cba1af 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -31,6 +31,7 @@ string. The tags can contain only letters, digits and _.
 """
 
 import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
 
 # Quiet some pylint warnings that happen because of the somewhat special
 # format for the code snippets.
@@ -88,6 +89,8 @@ def construct_pipeline(renames):
   p = beam.Pipeline(options=PipelineOptions())
   # [END pipelines_constructing_creating]
 
+  p = TestPipeline() # Use TestPipeline for testing.
+
   # [START pipelines_constructing_reading]
   lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')
   # [END pipelines_constructing_reading]
@@ -106,8 +109,9 @@ def construct_pipeline(renames):
   p.visit(SnippetUtils.RenameFiles(renames))
 
   # [START pipelines_constructing_running]
-  p.run()
+  result = p.run()
   # [END pipelines_constructing_running]
+  result
 
 
 def model_pipelines(argv):
@@ -144,8 +148,9 @@ def model_pipelines(argv):
    | beam.combiners.Count.PerKey()
    | beam.io.WriteToText(my_options.output))
 
-  p.run()
+  result = p.run()
   # [END model_pipelines]
+  result.wait_until_finish()
 
 
 def model_pcollection(argv):
@@ -175,8 +180,9 @@ def model_pcollection(argv):
        'Or to take arms against a sea of troubles, '])
    | beam.io.WriteToText(my_options.output))
 
-  p.run()
+  result = p.run()
   # [END model_pcollection]
+  result.wait_until_finish()
 
 
 def pipeline_options_remote(argv):
@@ -206,8 +212,7 @@ def pipeline_options_remote(argv):
   options = PipelineOptions(flags=argv)
 
   # For Cloud execution, set the Cloud Platform project, job_name,
-  # staging location, temp_location and specify DataflowRunner or
-  # BlockingDataflowRunner.
+  # staging location, temp_location and specify DataflowRunner.
   google_cloud_options = options.view_as(GoogleCloudOptions)
   google_cloud_options.project = 'my-project-id'
   google_cloud_options.job_name = 'myjob'
@@ -223,9 +228,7 @@ def pipeline_options_remote(argv):
   my_input = my_options.input
   my_output = my_options.output
 
-  # Overriding the runner for tests.
-  options.view_as(StandardOptions).runner = 'DirectRunner'
-  p = Pipeline(options=options)
+  p = TestPipeline()  # Use TestPipeline for testing.
 
   lines = p | beam.io.ReadFromText(my_input)
   lines | beam.io.WriteToText(my_output)
@@ -265,6 +268,7 @@ def pipeline_options_local(argv):
   p = Pipeline(options=options)
   # [END pipeline_options_local]
 
+  p = TestPipeline()  # Use TestPipeline for testing.
   lines = p | beam.io.ReadFromText(my_input)
   lines | beam.io.WriteToText(my_output)
   p.run()
@@ -288,7 +292,7 @@ def pipeline_options_command_line(argv):
   lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
   # [END pipeline_options_command_line]
 
-  p.run()
+  p.run().wait_until_finish()
 
 
 def pipeline_logging(lines, output):
@@ -296,7 +300,6 @@ def pipeline_logging(lines, output):
 
   import re
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START pipeline_logging]
   # import Python logging module.
@@ -316,7 +319,7 @@ def pipeline_logging(lines, output):
   # Remaining WordCount example code ...
   # [END pipeline_logging]
 
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
   (p
    | beam.Create(lines)
    | beam.ParDo(ExtractWordsFn())
@@ -372,7 +375,7 @@ def pipeline_monitoring(renames):
 
   pipeline_options = PipelineOptions()
   options = pipeline_options.view_as(WordCountOptions)
-  p = beam.Pipeline(options=pipeline_options)
+  p = TestPipeline()  # Use TestPipeline for testing.
 
   # [START pipeline_monitoring_execution]
   (p
@@ -405,7 +408,7 @@ def examples_wordcount_minimal(renames):
   google_cloud_options.job_name = 'myjob'
   google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
   google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
-  options.view_as(StandardOptions).runner = 'BlockingDataflowRunner'
+  options.view_as(StandardOptions).runner = 'DataflowRunner'
   # [END examples_wordcount_minimal_options]
 
   # Run it locally for testing.
@@ -441,8 +444,9 @@ def examples_wordcount_minimal(renames):
   p.visit(SnippetUtils.RenameFiles(renames))
 
   # [START examples_wordcount_minimal_run]
-  p.run()
+  result = p.run()
   # [END examples_wordcount_minimal_run]
+  result.wait_until_finish()
 
 
 def examples_wordcount_wordcount(renames):
@@ -497,7 +501,7 @@ def examples_wordcount_wordcount(renames):
 
   formatted |  beam.io.WriteToText('gs://my-bucket/counts.txt')
   p.visit(SnippetUtils.RenameFiles(renames))
-  p.run()
+  p.run().wait_until_finish()
 
 
 def examples_wordcount_debugging(renames):
@@ -505,7 +509,6 @@ def examples_wordcount_debugging(renames):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START example_wordcount_debugging_logging]
   # [START example_wordcount_debugging_aggregators]
@@ -546,7 +549,7 @@ def examples_wordcount_debugging(renames):
   # [END example_wordcount_debugging_logging]
   # [END example_wordcount_debugging_aggregators]
 
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
   filtered_words = (
       p
       | beam.io.ReadFromText(
@@ -649,7 +652,7 @@ def model_custom_source(count):
       lines, beam.equal_to(
           ['line ' + str(number) for number in range(0, count)]))
 
-  p.run()
+  p.run().wait_until_finish()
 
   # We recommend users to start Source classes with an underscore to discourage
   # using the Source class directly when a PTransform for the source is
@@ -679,7 +682,7 @@ def model_custom_source(count):
       lines, beam.equal_to(
           ['line ' + str(number) for number in range(0, count)]))
 
-  p.run()
+  p.run().wait_until_finish()
 
 
 def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
@@ -781,7 +784,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
                                    final_table_name))
   # [END model_custom_sink_use_new_sink]
 
-  p.run()
+  p.run().wait_until_finish()
 
   # We recommend users to start Sink class names with an underscore to
   # discourage using the Sink class directly when a PTransform for the sink is
@@ -812,7 +815,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
                       'http://url_to_simple_kv/', final_table_name)
   # [END model_custom_sink_use_ptransform]
 
-  p.run()
+  p.run().wait_until_finish()
 
 
 def model_textio(renames):
@@ -841,7 +844,7 @@ def model_textio(renames):
   # [END model_textio_write]
 
   p.visit(SnippetUtils.RenameFiles(renames))
-  p.run()
+  p.run().wait_until_finish()
 
 
 def model_datastoreio():
@@ -954,8 +957,7 @@ def model_composite_transform_example(contents, output_path):
   # [END composite_ptransform_apply_method]
   # [END composite_transform_example]
 
-  from apache_beam.utils.pipeline_options import PipelineOptions
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
   (p
    | beam.Create(contents)
    | CountWords()
@@ -967,8 +969,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
   """Merging a PCollection with Flatten."""
   some_hash_fn = lambda s: ord(s[0])
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
   partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
 
   # Partition into deciles
@@ -1005,8 +1006,7 @@ def model_multiple_pcollections_partition(contents, output_path):
     """Assume i in [0,100)."""
     return i
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
 
   students = p | beam.Create(contents)
 
@@ -1032,8 +1032,7 @@ def model_group_by_key(contents, output_path):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
   words_and_counts = (
       p
       | beam.Create(contents)
@@ -1056,8 +1055,7 @@ def model_group_by_key(contents, output_path):
 def model_co_group_by_key_tuple(email_list, phone_list, output_path):
   """Applying a CoGroupByKey Transform to a tuple."""
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
   # [START model_group_by_key_cogroupbykey_tuple]
   # Each data set is represented by key-value pairs in separate PCollections.
   # Both data sets share a common key type (in this example str).
@@ -1094,9 +1092,8 @@ def model_join_using_side_inputs(
 
   import apache_beam as beam
   from apache_beam.pvalue import AsIter
-  from apache_beam.utils.pipeline_options import PipelineOptions
 
-  p = beam.Pipeline(options=PipelineOptions())
+  p = TestPipeline()  # Use TestPipeline for testing.
   # [START model_join_using_side_inputs]
   # This code performs a join by receiving the set of names as an input and
   # passing PCollections that contain emails and phone numbers as side inputs

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index ffe0f58..34863f1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -20,7 +20,6 @@
 import glob
 import logging
 import os
-import sys
 import tempfile
 import unittest
 import uuid
@@ -35,6 +34,7 @@ from apache_beam.utils.pipeline_options import TypeOptions
 from apache_beam.examples.snippets import snippets
 
 # pylint: disable=expression-not-assigned
+from apache_beam.test_pipeline import TestPipeline
 
 
 class ParDoTest(unittest.TestCase):
@@ -110,7 +110,7 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({1, 2, 4}, set(result))
 
   def test_pardo_side_input(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
 
     # [START model_pardo_side_input]
@@ -228,7 +228,7 @@ class ParDoTest(unittest.TestCase):
 class TypeHintsTest(unittest.TestCase):
 
   def test_bad_types(self):
-    p = beam.Pipeline('DirectRunner', argv=sys.argv)
+    p = TestPipeline()
     evens = None  # pylint: disable=unused-variable
 
     # [START type_hints_missing_define_numbers]
@@ -290,7 +290,7 @@ class TypeHintsTest(unittest.TestCase):
 
   def test_runtime_checks_off(self):
     # pylint: disable=expression-not-assigned
-    p = beam.Pipeline('DirectRunner', argv=sys.argv)
+    p = TestPipeline()
     # [START type_hints_runtime_off]
     p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
     p.run()
@@ -298,7 +298,7 @@ class TypeHintsTest(unittest.TestCase):
 
   def test_runtime_checks_on(self):
     # pylint: disable=expression-not-assigned
-    p = beam.Pipeline('DirectRunner', argv=sys.argv)
+    p = TestPipeline()
     with self.assertRaises(typehints.TypeCheckError):
       # [START type_hints_runtime_on]
       p.options.view_as(TypeOptions).runtime_type_check = True
@@ -307,7 +307,7 @@ class TypeHintsTest(unittest.TestCase):
       # [END type_hints_runtime_on]
 
   def test_deterministic_key(self):
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     lines = (p | beam.Create(
         ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index d25ec3e..d0cc8a2 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -56,7 +56,7 @@ def run(argv=None):
   transformed | beam.io.Write(
       beam.io.PubSubSink(known_args.output_topic))
 
-  p.run()
+  p.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 35c1abb..adfc33d 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -66,7 +66,7 @@ def run(argv=None):
   transformed | beam.io.Write(
       'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
 
-  p.run()
+  p.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 211211d..92929af 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -97,6 +97,7 @@ def run(argv=None):
 
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()
+  result.wait_until_finish()
   empty_line_values = result.aggregated_values(empty_line_aggregator)
   logging.info('number of empty lines: %d', sum(empty_line_values.values()))
   word_length_values = result.aggregated_values(average_word_size_aggregator)

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 20d1c2f..ac13f35 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -32,7 +32,7 @@ pipeline configuration::
   --staging_location gs://YOUR_STAGING_DIRECTORY
   --temp_location gs://YOUR_TEMP_DIRECTORY
   --job_name YOUR_JOB_NAME
-  --runner BlockingDataflowRunner
+  --runner DataflowRunner
 
 and an output prefix on GCS::
 
@@ -151,7 +151,7 @@ def run(argv=None):
             | 'write' >> WriteToText(known_args.output))
 
   # Actually run the pipeline (all operations above are deferred).
-  p.run()
+  p.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 18595d0..b80ed84 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -73,7 +73,7 @@ def run(argv=None):
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
   pipeline_args.extend([
-      # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowRunner to
+      # CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
       # run your pipeline on the Google Cloud Dataflow Service.
       '--runner=DirectRunner',
       # CHANGE 3/5: Your project ID is required in order to run your pipeline on
@@ -113,7 +113,7 @@ def run(argv=None):
   output | 'write' >> WriteToText(known_args.output)
 
   # Actually run the pipeline (all operations above are deferred).
-  p.run()
+  p.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 3ff2529..2cc4684 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -26,6 +26,7 @@ from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.io import source_test_utils
 from apache_beam.io.concat_source import ConcatSource
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -212,7 +213,7 @@ class ConcatSourceTest(unittest.TestCase):
                            RangeSource(10, 100),
                            RangeSource(100, 1000),
                           ])
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Read(source)
     assert_that(pcoll, equal_to(range(1000)))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 8f12627..7481c4c 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -38,6 +38,7 @@ from apache_beam.io.concat_source import ConcatSource
 from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
 
 from apache_beam.io.filebasedsource import FileBasedSource
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that
@@ -364,7 +365,7 @@ class TestFileBasedSource(unittest.TestCase):
     self.assertItemsEqual(expected_data, read_data)
 
   def _run_source_test(self, pattern, expected_data, splittable=True):
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         pattern, splittable=splittable))
     assert_that(pcoll, equal_to(expected_data))
@@ -404,7 +405,7 @@ class TestFileBasedSource(unittest.TestCase):
     with bz2.BZ2File(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
@@ -419,7 +420,7 @@ class TestFileBasedSource(unittest.TestCase):
     with gzip.GzipFile(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
@@ -437,7 +438,7 @@ class TestFileBasedSource(unittest.TestCase):
       compressed_chunks.append(
           compressobj.compress('\n'.join(c)) + compressobj.flush())
     file_pattern = write_prepared_pattern(compressed_chunks)
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         splittable=False,
@@ -456,7 +457,7 @@ class TestFileBasedSource(unittest.TestCase):
         f.write('\n'.join(c))
       compressed_chunks.append(out.getvalue())
     file_pattern = write_prepared_pattern(compressed_chunks)
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         splittable=False,
@@ -471,7 +472,7 @@ class TestFileBasedSource(unittest.TestCase):
     with bz2.BZ2File(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         compression_type=fileio.CompressionTypes.AUTO))
@@ -485,7 +486,7 @@ class TestFileBasedSource(unittest.TestCase):
     with gzip.GzipFile(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         compression_type=fileio.CompressionTypes.AUTO))
@@ -504,7 +505,7 @@ class TestFileBasedSource(unittest.TestCase):
       compressed_chunks.append(out.getvalue())
     file_pattern = write_prepared_pattern(
         compressed_chunks, suffixes=['.gz']*len(chunks))
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         compression_type=fileio.CompressionTypes.AUTO))
@@ -526,7 +527,7 @@ class TestFileBasedSource(unittest.TestCase):
         chunks_to_write.append('\n'.join(c))
     file_pattern = write_prepared_pattern(chunks_to_write,
                                           suffixes=(['.gz', '']*3))
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         compression_type=fileio.CompressionTypes.AUTO))

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 68e2bce..8842369 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -34,6 +34,7 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
@@ -760,7 +761,7 @@ class TestNativeTextFileSink(unittest.TestCase):
       self.assertEqual(f.read().splitlines(), [])
 
   def test_write_native(self):
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path))  # pylint: disable=expression-not-assigned
     pipeline.run()
@@ -773,7 +774,7 @@ class TestNativeTextFileSink(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_native_auto_compression(self):
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
@@ -788,7 +789,7 @@ class TestNativeTextFileSink(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_native_auto_compression_unsharded(self):
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
@@ -880,7 +881,7 @@ class TestFileSink(unittest.TestCase):
     temp_path = tempfile.NamedTemporaryFile().name
     sink = MyFileSink(
         temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
     p.run()
     self.assertEqual(
@@ -894,7 +895,7 @@ class TestFileSink(unittest.TestCase):
         num_shards=3,
         shard_name_template='_NN_SSS_',
         coder=coders.ToStringCoder())
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
 
     p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index f0f2046..dc0fd54 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -27,6 +27,7 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -98,7 +99,7 @@ class SourcesTest(unittest.TestCase):
 
   def test_run_direct(self):
     file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Read(LineSource(file_name))
     assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd']))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 877e190..07ab4cc 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -43,6 +43,8 @@ from apache_beam.io.filebasedsource_test import write_data
 from apache_beam.io.filebasedsource_test import write_pattern
 from apache_beam.io.fileio import CompressionTypes
 
+from apache_beam.test_pipeline import TestPipeline
+
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
@@ -304,7 +306,7 @@ class TextSourceTest(unittest.TestCase):
   def test_dataflow_single_file(self):
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(file_name)
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
@@ -319,7 +321,7 @@ class TextSourceTest(unittest.TestCase):
 
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(file_name, coder=DummyCoder())
     assert_that(pcoll, equal_to([record * 2 for record in expected_data]))
     pipeline.run()
@@ -327,7 +329,7 @@ class TextSourceTest(unittest.TestCase):
   def test_dataflow_file_pattern(self):
     pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4])
     assert len(expected_data) == 40
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(pattern)
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
@@ -339,7 +341,7 @@ class TextSourceTest(unittest.TestCase):
     with bz2.BZ2File(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(file_name)
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
@@ -351,7 +353,7 @@ class TextSourceTest(unittest.TestCase):
     with gzip.GzipFile(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(file_name)
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
@@ -363,7 +365,7 @@ class TextSourceTest(unittest.TestCase):
     with bz2.BZ2File(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(
         file_name,
         compression_type=CompressionTypes.BZIP2)
@@ -377,7 +379,7 @@ class TextSourceTest(unittest.TestCase):
     with gzip.GzipFile(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(
         file_name,
         0, CompressionTypes.GZIP,
@@ -392,7 +394,7 @@ class TextSourceTest(unittest.TestCase):
     with gzip.GzipFile(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(
         file_name,
         0, CompressionTypes.GZIP,
@@ -425,7 +427,7 @@ class TextSourceTest(unittest.TestCase):
   def test_read_gzip_empty_file(self):
     filename = tempfile.NamedTemporaryFile(
         delete=False, prefix=tempfile.template).name
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> ReadFromText(
         filename,
         0, CompressionTypes.GZIP,
@@ -521,7 +523,7 @@ class TextSinkTest(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_write_dataflow(self):
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> WriteToText(self.path)  # pylint: disable=expression-not-assigned
     pipeline.run()
@@ -534,7 +536,7 @@ class TextSinkTest(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_dataflow_auto_compression(self):
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz')  # pylint: disable=expression-not-assigned
     pipeline.run()
@@ -547,7 +549,7 @@ class TextSinkTest(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_dataflow_auto_compression_unsharded(self):
-    pipeline = beam.Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='')  # pylint: disable=expression-not-assigned
     pipeline.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 0110273..fa78d6f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -25,6 +25,7 @@ select = E3
 
 [testenv:py27]
 deps=
+  nose
   pep8
   pylint
 commands =


[5/6] beam git commit: Add dependency comments to tox file.

Posted by ro...@apache.org.
Add dependency comments to tox file.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2197009d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2197009d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2197009d

Branch: refs/heads/python-sdk
Commit: 2197009d4048fb0b76454c479171d1ffd2d57844
Parents: 4ded918
Author: Ahmet Altay <al...@google.com>
Authored: Tue Jan 17 15:41:45 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 10:01:41 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/snippets/snippets.py | 3 +--
 sdks/python/tox.ini                                   | 2 ++
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2197009d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 0cba1af..ff240eb 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -109,9 +109,8 @@ def construct_pipeline(renames):
   p.visit(SnippetUtils.RenameFiles(renames))
 
   # [START pipelines_constructing_running]
-  result = p.run()
+  p.run()
   # [END pipelines_constructing_running]
-  result
 
 
 def model_pipelines(argv):

http://git-wip-us.apache.org/repos/asf/beam/blob/2197009d/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index fa78d6f..20afa58 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -24,6 +24,8 @@ envlist = py27
 select = E3
 
 [testenv:py27]
+# autocomplete_test depends on nose when invoked directly.
+# run_pylint.sh depends on pep8 and pylint.
 deps=
   nose
   pep8


[6/6] beam git commit: Closes #1762

Posted by ro...@apache.org.
Closes #1762


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/36a7d349
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/36a7d349
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/36a7d349

Branch: refs/heads/python-sdk
Commit: 36a7d3491b4929f7539f2aa620b4ad25865036f6
Parents: f25c0e4 2197009
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Jan 18 10:10:50 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 10:10:50 2017 -0800

----------------------------------------------------------------------
 .../examples/complete/autocomplete_test.py      |  3 +-
 .../examples/complete/estimate_pi_test.py       |  4 +-
 .../complete/juliaset/juliaset/juliaset.py      |  2 +-
 .../complete/juliaset/juliaset/juliaset_test.py |  2 +-
 .../examples/complete/juliaset/juliaset_main.py |  2 +-
 .../apache_beam/examples/complete/tfidf.py      |  3 +-
 .../apache_beam/examples/complete/tfidf_test.py |  3 +-
 .../examples/complete/top_wikipedia_sessions.py |  2 +-
 .../complete/top_wikipedia_sessions_test.py     |  3 +-
 .../cookbook/bigquery_side_input_test.py        |  3 +-
 .../cookbook/bigquery_tornadoes_test.py         |  2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |  2 +-
 .../examples/cookbook/bigshuffle_test.py        |  2 +-
 .../examples/cookbook/coders_test.py            |  3 +-
 .../examples/cookbook/combiners_test.py         |  5 +-
 .../examples/cookbook/custom_ptransform.py      |  4 +-
 .../examples/cookbook/custom_ptransform_test.py |  3 +-
 .../examples/cookbook/datastore_wordcount.py    |  7 +-
 .../examples/cookbook/filters_test.py           |  3 +-
 .../examples/cookbook/group_with_coder.py       |  2 +-
 .../examples/cookbook/group_with_coder_test.py  |  4 +-
 .../examples/cookbook/mergecontacts.py          |  2 +-
 .../examples/cookbook/mergecontacts_test.py     |  3 +-
 .../examples/cookbook/multiple_output_pardo.py  |  4 +-
 .../cookbook/multiple_output_pardo_test.py      |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 62 ++++++-------
 .../examples/snippets/snippets_test.py          | 12 +--
 .../apache_beam/examples/streaming_wordcap.py   |  2 +-
 .../apache_beam/examples/streaming_wordcount.py |  2 +-
 sdks/python/apache_beam/examples/wordcount.py   |  1 +
 .../apache_beam/examples/wordcount_debugging.py |  4 +-
 .../apache_beam/examples/wordcount_minimal.py   |  4 +-
 .../python/apache_beam/io/concat_source_test.py |  3 +-
 .../apache_beam/io/filebasedsource_test.py      | 19 ++--
 sdks/python/apache_beam/io/fileio_test.py       | 11 +--
 sdks/python/apache_beam/io/sources_test.py      |  3 +-
 sdks/python/apache_beam/io/textio_test.py       | 26 +++---
 sdks/python/apache_beam/pipeline.py             |  2 +-
 sdks/python/apache_beam/pipeline_test.py        | 13 +--
 .../apache_beam/runners/dataflow_runner.py      | 60 +++++++++----
 .../apache_beam/runners/direct/direct_runner.py | 15 ++--
 sdks/python/apache_beam/runners/runner.py       | 35 +++++++-
 sdks/python/apache_beam/runners/runner_test.py  |  1 +
 .../apache_beam/runners/template_runner_test.py |  4 +-
 .../runners/test/test_dataflow_runner.py        |  4 +-
 sdks/python/apache_beam/test_pipeline.py        | 11 ++-
 .../apache_beam/tests/pipeline_verifiers.py     |  4 +-
 .../apache_beam/transforms/aggregator_test.py   |  3 +-
 .../apache_beam/transforms/combiners_test.py    | 26 +++---
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   | 93 ++++++++++----------
 .../apache_beam/transforms/window_test.py       | 10 +--
 .../transforms/write_ptransform_test.py         |  5 +-
 .../typehints/typed_pipeline_test.py            |  6 +-
 .../apache_beam/utils/pipeline_options.py       |  3 +-
 sdks/python/run_postcommit.sh                   |  2 +-
 sdks/python/tox.ini                             |  3 +
 57 files changed, 304 insertions(+), 222 deletions(-)
----------------------------------------------------------------------



[2/6] beam git commit: Implement wait_until_finish method for existing runners.

Posted by ro...@apache.org.
Implement wait_until_finish method for existing runners.

Also defines the not implemented cancel() method and updates existing
usages to use wait_until_finish() instead of blocking runners.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74dda50e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74dda50e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74dda50e

Branch: refs/heads/python-sdk
Commit: 74dda50e64a93ab3c147ac24f7436ef04467aa27
Parents: f25c0e4
Author: Ahmet Altay <al...@google.com>
Authored: Mon Jan 9 18:23:20 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 09:55:35 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             |  2 +-
 .../apache_beam/runners/dataflow_runner.py      | 60 +++++++++++++-------
 .../apache_beam/runners/direct/direct_runner.py | 15 ++---
 sdks/python/apache_beam/runners/runner.py       | 35 +++++++++++-
 sdks/python/apache_beam/runners/runner_test.py  |  1 +
 .../apache_beam/runners/template_runner_test.py |  4 +-
 .../runners/test/test_dataflow_runner.py        |  4 +-
 .../apache_beam/utils/pipeline_options.py       |  3 +-
 sdks/python/run_postcommit.sh                   |  2 +-
 9 files changed, 90 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 6517960..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -167,7 +167,7 @@ class Pipeline(object):
 
   def __exit__(self, exc_type, exc_val, exc_tb):
     if not exc_type:
-      self.run()
+      self.run().wait_until_finish()
 
   def visit(self, visitor):
     """Visits depth-first every node of a pipeline's DAG.

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 3505acc..330472b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,7 @@ class DataflowRunner(PipelineRunner):
         if not page_token:
           break
 
-    runner.result = DataflowPipelineResult(response)
+    runner.result = DataflowPipelineResult(response, runner)
     runner.last_error_msg = last_error_msg
 
   def run(self, pipeline):
@@ -176,23 +176,11 @@ class DataflowRunner(PipelineRunner):
 
     # Create the job
     self.result = DataflowPipelineResult(
-        self.dataflow_client.create_job(self.job))
+        self.dataflow_client.create_job(self.job), self)
 
     if self.result.has_job and self.blocking:
-      thread = threading.Thread(
-          target=DataflowRunner.poll_for_job_completion,
-          args=(self, self.result.job_id()))
-      # Mark the thread as a daemon thread so a keyboard interrupt on the main
-      # thread will terminate everything. This is also the reason we will not
-      # use thread.join() to wait for the polling thread.
-      thread.daemon = True
-      thread.start()
-      while thread.isAlive():
-        time.sleep(5.0)
-      if self.result.current_state() != PipelineState.DONE:
-        raise DataflowRuntimeException(
-            'Dataflow pipeline failed:\n%s'
-            % getattr(self, 'last_error_msg', None), self.result)
+      self.result.wait_until_finish()
+
     return self.result
 
   def _get_typehint_based_encoding(self, typehint, window_coder):
@@ -651,9 +639,10 @@ class DataflowRunner(PipelineRunner):
 class DataflowPipelineResult(PipelineResult):
   """Represents the state of a pipeline run on the Dataflow service."""
 
-  def __init__(self, job):
+  def __init__(self, job, runner):
     """Job is a Job message from the Dataflow API."""
     self._job = job
+    self._runner = runner
 
   def job_id(self):
     return self._job.id
@@ -662,12 +651,16 @@ class DataflowPipelineResult(PipelineResult):
   def has_job(self):
     return self._job is not None
 
-  def current_state(self):
+  @property
+  def state(self):
     """Return the current state of the remote job.
 
     Returns:
       A PipelineState object.
     """
+    if not self.has_job:
+      return PipelineState.UNKNOWN
+
     values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
     api_jobstate_map = {
         values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
@@ -684,11 +677,40 @@ class DataflowPipelineResult(PipelineResult):
     return (api_jobstate_map[self._job.currentState] if self._job.currentState
             else PipelineState.UNKNOWN)
 
+  def _is_in_terminal_state(self):
+    if not self.has_job:
+      return True
+
+    return self.state in [
+        PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED,
+        PipelineState.CANCELLED, PipelineState.DRAINED]
+
+  def wait_until_finish(self, duration=None):
+    if not self._is_in_terminal_state():
+      if not self.has_job:
+        raise IOError('Failed to get the Dataflow job id.')
+      if duration:
+        raise NotImplementedError(
+            'DataflowRunner does not support duration argument.')
+
+      thread = threading.Thread(
+          target=DataflowRunner.poll_for_job_completion,
+          args=(self._runner, self.job_id()))
+
+      # Mark the thread as a daemon thread so a keyboard interrupt on the main
+      # thread will terminate everything. This is also the reason we will not
+      # use thread.join() to wait for the polling thread.
+      thread.daemon = True
+      thread.start()
+      while thread.isAlive():
+        time.sleep(5.0)
+    return self.state
+
   def __str__(self):
     return '<%s %s %s>' % (
         self.__class__.__name__,
         self.job_id(),
-        self.current_state())
+        self.state)
 
   def __repr__(self):
     return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index a5c616b..dc2668d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -43,7 +43,7 @@ class DirectRunner(PipelineRunner):
   def run(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 
-    # TODO: Move imports to top. Pipeline <-> Runner dependecy cause problems
+    # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
     # with resolving imports when they are at top.
     # pylint: disable=wrong-import-position
     from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import \
@@ -76,12 +76,10 @@ class DirectRunner(PipelineRunner):
     executor.start(self.visitor.root_transforms)
     result = DirectPipelineResult(executor, evaluation_context)
 
-    # TODO(altay): If blocking:
-    # Block until the pipeline completes. This call will return after the
-    # pipeline was fully terminated (successfully or with a failure).
-    result.await_completion()
-
     if self._cache:
+      # We are running in eager mode, block until the pipeline execution
+      # completes in order to have full results in the cache.
+      result.wait_until_finish()
       self._cache.finalize()
 
     return result
@@ -141,8 +139,11 @@ class DirectPipelineResult(PipelineResult):
   def _is_in_terminal_state(self):
     return self._state is not PipelineState.RUNNING
 
-  def await_completion(self):
+  def wait_until_finish(self, duration=None):
     if not self._is_in_terminal_state():
+      if duration:
+        raise NotImplementedError(
+            'DirectRunner does not support duration argument.')
       try:
         self._executor.await_completion()
         self._state = PipelineState.DONE

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 3dc4d28..1a50df4 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -287,7 +287,7 @@ class PValueCache(object):
 
 
 class PipelineState(object):
-  """State of the Pipeline, as returned by PipelineResult.current_state().
+  """State of the Pipeline, as returned by PipelineResult.state.
 
   This is meant to be the union of all the states any runner can put a
   pipeline in.  Currently, it represents the values of the dataflow
@@ -310,10 +310,39 @@ class PipelineResult(object):
   def __init__(self, state):
     self._state = state
 
-  def current_state(self):
-    """Return the current state of running the pipeline."""
+  @property
+  def state(self):
+    """Return the current state of the pipeline execution."""
     return self._state
 
+  def wait_until_finish(self, duration=None):
+    """Waits until the pipeline finishes and returns the final status.
+
+    Args:
+      duration: The time to wait (in milliseconds) for job to finish. If it is
+        set to None, it will wait indefinitely until the job is finished.
+
+    Raises:
+      IOError: If there is a persistent problem getting job information.
+      NotImplementedError: If the runner does not support this operation.
+
+    Returns:
+      The final state of the pipeline, or None on timeout.
+    """
+    raise NotImplementedError
+
+  def cancel(self):
+    """Cancels the pipeline execution.
+
+    Raises:
+      IOError: If there is a persistent problem getting job information.
+      NotImplementedError: If the runner does not support this operation.
+
+    Returns:
+      The final state of the pipeline.
+    """
+    raise NotImplementedError
+
   # pylint: disable=unused-argument
   def aggregated_values(self, aggregator_or_name):
     """Return a dict of step names to values of the Aggregator."""

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index ea86061..2b6c316 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -161,6 +161,7 @@ class RunnerTest(unittest.TestCase):
     (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
      | 'do' >> beam.ParDo(MyDoFn()))
     result = p.run()
+    result.wait_until_finish()
     metrics = result.metrics().query()
     namespace = '{}.{}'.format(MyDoFn.__module__,
                                MyDoFn.__name__)

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py
index 457022d..af7f2c6 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -55,7 +55,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
                             '--no_auth=True']))
 
     pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
-    pipeline.run()
+    pipeline.run().wait_until_finish()
     with open(dummy_file_name) as template_file:
       saved_job_dict = json.load(template_file)
       self.assertEqual(
@@ -81,7 +81,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
     remote_runner.job = apiclient.Job(pipeline.options)
 
     with self.assertRaises(IOError):
-      pipeline.run()
+      pipeline.run().wait_until_finish()
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
index 77655bd..823e534 100644
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -25,14 +25,16 @@ from apache_beam.utils.pipeline_options import TestOptions
 class TestDataflowRunner(DataflowRunner):
 
   def __init__(self):
-    super(TestDataflowRunner, self).__init__(blocking=True)
+    super(TestDataflowRunner, self).__init__()
 
   def run(self, pipeline):
     """Execute test pipeline and verify test matcher"""
     self.result = super(TestDataflowRunner, self).run(pipeline)
+    self.result.wait_until_finish()
 
     options = pipeline.options.view_as(TestOptions)
     if options.on_success_matcher:
       from hamcrest import assert_that as hc_assert_that
       hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
+
     return self.result

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index 9f57ee7..16b1640 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -182,8 +182,7 @@ class StandardOptions(PipelineOptions):
     parser.add_argument(
         '--runner',
         help=('Pipeline runner used to execute the workflow. Valid values are '
-              'DirectRunner, DataflowRunner, '
-              'and BlockingDataflowRunner.'))
+              'DirectRunner, DataflowRunner.'))
     # Whether to enable streaming mode.
     parser.add_argument('--streaming',
                         default=False,

http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/run_postcommit.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 67a257e..2e419a5 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -74,7 +74,7 @@ SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz)
 echo ">>> RUNNING DATAFLOW RUNNER VALIDATESRUNNER TESTS"
 python setup.py nosetests \
   -a ValidatesRunner --test-pipeline-options=" \
-    --runner=BlockingDataflowRunner \
+    --runner=TestDataflowRunner \
     --project=$PROJECT \
     --staging_location=$GCS_LOCATION/staging-validatesrunner-test \
     --temp_location=$GCS_LOCATION/temp-validatesrunner-test \