You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 18:11:36 UTC

[4/6] beam git commit: Update tests to use TestPipeline()

Update tests to use TestPipeline()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ded9185
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ded9185
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ded9185

Branch: refs/heads/python-sdk
Commit: 4ded9185c063eae9f54ce457a1fa753679d1ce82
Parents: 703c1bc
Author: Ahmet Altay <al...@google.com>
Authored: Sun Jan 15 00:35:49 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 10:01:41 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline_test.py        | 13 +--
 .../apache_beam/tests/pipeline_verifiers.py     |  4 +-
 .../apache_beam/transforms/aggregator_test.py   |  3 +-
 .../apache_beam/transforms/combiners_test.py    | 26 +++---
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   | 93 ++++++++++----------
 .../apache_beam/transforms/window_test.py       | 10 +--
 .../transforms/write_ptransform_test.py         |  5 +-
 .../typehints/typed_pipeline_test.py            |  6 +-
 9 files changed, 81 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index d6925d4..336bf54 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -25,6 +25,7 @@ from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
@@ -94,7 +95,7 @@ class PipelineTest(unittest.TestCase):
       self.leave_composite.append(transform_node)
 
   def test_create(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'label1' >> Create([1, 2, 3])
     assert_that(pcoll, equal_to([1, 2, 3]))
 
@@ -105,13 +106,13 @@ class PipelineTest(unittest.TestCase):
     pipeline.run()
 
   def test_create_singleton_pcollection(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
     assert_that(pcoll, equal_to([[1, 2, 3]]))
     pipeline.run()
 
   def test_read(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
     assert_that(pcoll, equal_to([1, 2, 3]))
     pipeline.run()
@@ -136,7 +137,7 @@ class PipelineTest(unittest.TestCase):
     self.assertEqual(visitor.leave_composite[0].transform, transform)
 
   def test_apply_custom_transform(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
     result = pcoll | PipelineTest.CustomTransform()
     assert_that(result, equal_to([2, 3, 4]))
@@ -158,7 +159,7 @@ class PipelineTest(unittest.TestCase):
         'pvalue | "label" >> transform')
 
   def test_reuse_cloned_custom_transform_instance(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = TestPipeline(runner=self.runner_name)
     pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
     pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6])
     transform = PipelineTest.CustomTransform()
@@ -207,7 +208,7 @@ class PipelineTest(unittest.TestCase):
     num_elements = 10
     num_maps = 100
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline(runner='DirectRunner')
 
     # Consumed memory should not be proportional to the number of maps.
     memory_threshold = (

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 6bf8d48..9b286a2 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -46,7 +46,7 @@ class PipelineStateMatcher(BaseMatcher):
     self.expected_state = expected_state
 
   def _matches(self, pipeline_result):
-    return pipeline_result.current_state() == self.expected_state
+    return pipeline_result.state == self.expected_state
 
   def describe_to(self, description):
     description \
@@ -56,7 +56,7 @@ class PipelineStateMatcher(BaseMatcher):
   def describe_mismatch(self, pipeline_result, mismatch_description):
     mismatch_description \
       .append_text("Test pipeline job terminated in state: ") \
-      .append_text(pipeline_result.current_state())
+      .append_text(pipeline_result.state)
 
 
 def retry_on_io_error_and_server_error(exception):

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py
index e77dfba..d493c46 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -22,6 +22,7 @@ import unittest
 import apache_beam as beam
 from apache_beam.transforms import combiners
 from apache_beam.transforms.aggregator import Aggregator
+from apache_beam.test_pipeline import TestPipeline
 
 
 class AggregatorTest(unittest.TestCase):
@@ -63,7 +64,7 @@ class AggregatorTest(unittest.TestCase):
         for a in aggregators:
           context.aggregate_to(a, context.element)
 
-    p = beam.Pipeline('DirectRunner')
+    p = TestPipeline()
     p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators())  # pylint: disable=expression-not-assigned
     res = p.run()
     for (_, _, expected), a in zip(counter_types, aggregators):

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 72fce60..8a6d352 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -22,7 +22,7 @@ import unittest
 import hamcrest as hc
 
 import apache_beam as beam
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.core import CombineGlobally
 from apache_beam.transforms.core import Create
@@ -40,7 +40,7 @@ class CombineTest(unittest.TestCase):
     combine.TopCombineFn._MIN_BUFFER_OVERSIZE = 1
 
   def test_builtin_combines(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
 
     vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
     mean = sum(vals) / float(len(vals))
@@ -62,7 +62,7 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_top(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
 
     # A parameter we'll be sharing with a custom comparator.
     names = {0: 'zo',
@@ -201,7 +201,7 @@ class CombineTest(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_top_shorthands(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
 
     pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
     result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
@@ -222,7 +222,7 @@ class CombineTest(unittest.TestCase):
 
     # First test global samples (lots of them).
     for ix in xrange(300):
-      pipeline = Pipeline('DirectRunner')
+      pipeline = TestPipeline()
       pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
       result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
 
@@ -241,7 +241,7 @@ class CombineTest(unittest.TestCase):
       pipeline.run()
 
     # Now test per-key samples.
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start-perkey' >> Create(
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
     result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
@@ -258,7 +258,7 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_tuple_combine_fn(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (
         p
         | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
@@ -269,7 +269,7 @@ class CombineTest(unittest.TestCase):
     p.run()
 
   def test_tuple_combine_fn_without_defaults(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (
         p
         | Create([1, 1, 2, 3])
@@ -280,7 +280,7 @@ class CombineTest(unittest.TestCase):
     p.run()
 
   def test_to_list_and_to_dict(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
     pcoll = pipeline | 'start' >> Create(the_list)
     result = pcoll | 'to list' >> combine.ToList()
@@ -292,7 +292,7 @@ class CombineTest(unittest.TestCase):
     assert_that(result, matcher([the_list]))
     pipeline.run()
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pairs = [(1, 2), (3, 4), (5, 6)]
     pcoll = pipeline | 'start-pairs' >> Create(pairs)
     result = pcoll | 'to dict' >> combine.ToDict()
@@ -306,12 +306,12 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_combine_globally_with_default(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
     p.run()
 
   def test_combine_globally_without_default(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = p | Create([]) | CombineGlobally(sum).without_defaults()
     assert_that(result, equal_to([]))
     p.run()
@@ -323,7 +323,7 @@ class CombineTest(unittest.TestCase):
         main = pcoll.pipeline | Create([None])
         return main | Map(lambda _, s: s, side)
 
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
     result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
     assert_that(result1, equal_to([0]), label='r1')

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 006a937..b5ac64b 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -443,7 +443,7 @@ class PTransform(WithTypeHints, HasDisplayData):
       # Get a reference to the runners internal cache, otherwise runner may
       # clean it after run.
       cache = p.runner.cache
-      p.run()
+      p.run().wait_until_finish()
       return _MaterializePValues(cache).visit(result)
 
   def _extract_input_pvalues(self, pvalueish):

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 705e85e..58382e4 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -26,7 +26,7 @@ import unittest
 import hamcrest as hc
 
 import apache_beam as beam
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 import apache_beam.pvalue as pvalue
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
@@ -36,7 +36,6 @@ import apache_beam.typehints as typehints
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
 from apache_beam.typehints.typehints_test import TypeHintTestCase
-from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import TypeOptions
 
 
@@ -54,12 +53,12 @@ class PTransformTest(unittest.TestCase):
     self.assertEqual('<PTransform(PTransform) label=[PTransform]>',
                      str(PTransform()))
 
-    pa = Pipeline('DirectRunner')
+    pa = TestPipeline()
     res = pa | 'a_label' >> beam.Create([1, 2])
     self.assertEqual('AppliedPTransform(a_label, Create)',
                      str(res.producer))
 
-    pc = Pipeline('DirectRunner')
+    pc = TestPipeline()
     res = pc | beam.Create([1, 2])
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
@@ -67,7 +66,7 @@ class PTransformTest(unittest.TestCase):
         """<Create(PTransform) label=[Create] inputs=('ci',)>""",
         str(inputs_tr))
 
-    pd = Pipeline('DirectRunner')
+    pd = TestPipeline()
     res = pd | beam.Create([1, 2])
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
@@ -111,7 +110,7 @@ class PTransformTest(unittest.TestCase):
       def process(self, context, addon):
         return [context.element + addon]
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
     assert_that(result, equal_to([11, 12, 13]))
@@ -123,20 +122,20 @@ class PTransformTest(unittest.TestCase):
       def process(self, context):
         pass
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     with self.assertRaises(ValueError):
       pcoll | 'do' >> beam.ParDo(MyDoFn)  # Note the lack of ()'s
 
   def test_do_with_callable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_side_input_as_arg(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     side = pipeline | 'side' >> beam.Create([10])
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
@@ -145,7 +144,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_do_with_side_input_as_keyword_arg(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     side = pipeline | 'side' >> beam.Create([10])
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
@@ -154,7 +153,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_do_with_do_fn_returning_string_raises_warning(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
     pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
 
@@ -168,7 +167,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_with_do_fn_returning_dict_raises_warning(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
     pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
 
@@ -182,7 +181,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_with_side_outputs_maintains_unique_name(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
     r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
@@ -195,7 +194,7 @@ class PTransformTest(unittest.TestCase):
     # iterable.
     def incorrect_par_do_fn(x):
       return x + 5
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
     pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
     # It's a requirement that all user-defined functions to a ParDo return
@@ -216,7 +215,7 @@ class PTransformTest(unittest.TestCase):
 
       def finish_bundle(self, c):
         yield 'finish'
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.ParDo(MyDoFn())
 
@@ -231,7 +230,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_filter(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
     result = pcoll | beam.Filter(
         'filter', lambda x: x % 2 == 0)
@@ -257,7 +256,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_combine_fn(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
     assert_that(result, equal_to([sum(vals) / len(vals)]))
@@ -265,7 +264,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_callable(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | beam.CombineGlobally(sum)
     assert_that(result, equal_to([sum(vals)]))
@@ -273,7 +272,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_side_input_as_arg(self):
     values = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(values)
     divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombineGlobally(
@@ -288,7 +287,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_combine_fn(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
@@ -299,7 +298,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_callable(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
@@ -309,7 +308,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_side_input_as_arg(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     divisor = pipeline | 'divisor' >> beam.Create([2])
@@ -322,7 +321,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_group_by_key(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
     result = pcoll | 'group' >> beam.GroupByKey()
@@ -336,7 +335,7 @@ class PTransformTest(unittest.TestCase):
       def partition_for(self, context, num_partitions, offset):
         return (context.element % 3) + offset
 
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     # Attempt nominal partition operation.
     partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
@@ -348,14 +347,14 @@ class PTransformTest(unittest.TestCase):
 
     # Check that a bad partition label will yield an error. For the
     # DirectRunner, this error manifests as an exception.
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_partition_with_callable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = (
         pcoll | beam.Partition(
@@ -370,7 +369,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_partition_followed_by_flatten_and_groupbykey(self):
     """Regression test for an issue with how partitions are handled."""
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     contents = [('aa', 1), ('bb', 2), ('aa', 2)]
     created = pipeline | 'A' >> beam.Create(contents)
     partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
@@ -380,7 +379,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_pcollections(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
     pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
@@ -388,7 +387,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_no_pcollections(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     with self.assertRaises(ValueError):
       () | 'pipeline arg missing' >> beam.Flatten()
     result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
@@ -396,7 +395,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_pcollections_in_iterable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
     pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
@@ -417,7 +416,7 @@ class PTransformTest(unittest.TestCase):
       set([1, 2, 3]) | 'flatten' >> beam.Flatten()
 
   def test_co_group_by_key_on_list(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -429,7 +428,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_co_group_by_key_on_iterable(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -442,7 +441,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_co_group_by_key_on_dict(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -454,7 +453,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_group_by_key_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5])
 
     with self.assertRaises(typehints.TypeCheckError) as e:
@@ -467,7 +466,7 @@ class PTransformTest(unittest.TestCase):
         'Tuple[TypeVariable[K], TypeVariable[V]]')
 
   def test_group_by_key_only_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
       pcolls | 'D' >> beam.GroupByKeyOnly()
@@ -478,7 +477,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_keys_and_values(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
     keys = pcoll.apply('keys', beam.Keys())
@@ -488,7 +487,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_kv_swap(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
     result = pcoll.apply('swap', beam.KvSwap())
@@ -496,7 +495,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_remove_duplicates(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
     result = pcoll.apply('nodupes', beam.RemoveDuplicates())
@@ -504,7 +503,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_chained_ptransforms(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     t = (beam.Map(lambda x: (x, 1))
          | beam.GroupByKey()
          | beam.Map(lambda (x, ones): (x, sum(ones))))
@@ -581,7 +580,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_chained_ptransforms(self):
     """Tests that chaining gets proper nesting."""
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     map1 = beam.Map('map1', lambda x: (x, 1))
     gbk = beam.GroupByKey('gbk')
     map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
@@ -594,7 +593,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_custom_transform_without_label(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform()
     result = pipeline.apply(custom, pcoll)
@@ -604,7 +603,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_custom_transform_with_label(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform('*custom*')
     result = pipeline.apply(custom, pcoll)
@@ -615,7 +614,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_combine_without_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally(sum)
     result = pcoll | combine
@@ -624,7 +623,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_ptransform_using_decorator(self):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     sample = SamplePTransform('*sample*')
     _ = pcoll | sample
@@ -635,7 +634,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_combine_with_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally('*sum*', sum)
     result = pcoll | combine
@@ -644,7 +643,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def check_label(self, ptransform, expected_label):
-    pipeline = Pipeline('DirectRunner')
+    pipeline = TestPipeline()
     pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
     actual_label = sorted(pipeline.applied_labels - {'start'})[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
@@ -728,7 +727,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                     '"%s" does not start with "%s"' % (msg, prefix))
 
   def setUp(self):
-    self.p = Pipeline(options=PipelineOptions([]))
+    self.p = TestPipeline()
 
   def test_do_fn_pipeline_pipeline_type_check_satisfied(self):
     @with_input_types(int, int)

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 856d011..d4e8e25 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,7 +19,7 @@
 
 import unittest
 
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
 from apache_beam.transforms import core
@@ -140,7 +140,7 @@ class WindowTest(unittest.TestCase):
             | Map(lambda x: WindowedValue((key, x), x, [])))
 
   def test_sliding_windows(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
     result = (pcoll
               | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
@@ -153,7 +153,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_sessions(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
     result = (pcoll
               | 'w' >> WindowInto(Sessions(10))
@@ -166,7 +166,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_timestamped_value(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (p
               | 'start' >> Create([(k, k) for k in range(10)])
               | Map(lambda (x, t): TimestampedValue(x, t))
@@ -178,7 +178,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_timestamped_with_combiners(self):
-    p = Pipeline('DirectRunner')
+    p = TestPipeline()
     result = (p
               # Create some initial test values.
               | 'start' >> Create([(k, k) for k in range(10)])

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index f96dffb..3d7fbd9 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -22,10 +22,9 @@ import unittest
 import apache_beam as beam
 
 from apache_beam.io import iobase
-from apache_beam.pipeline import Pipeline
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import assert_that, is_empty
-from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class _TestSink(iobase.Sink):
@@ -99,7 +98,7 @@ class WriteTest(unittest.TestCase):
                       return_write_results=True):
     write_to_test_sink = WriteToTestSink(return_init_result,
                                          return_write_results)
-    p = Pipeline(options=PipelineOptions([]))
+    p = TestPipeline()
     result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
 
     assert_that(result, is_empty())

http://git-wip-us.apache.org/repos/asf/beam/blob/4ded9185/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 8b5e3f4..35987b7 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -23,10 +23,10 @@ import unittest
 import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam import typehints
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.typehints import WithTypeHints
 from apache_beam.utils.pipeline_options import OptionsContext
-from apache_beam.utils.pipeline_options import PipelineOptions
 
 # These test often construct a pipeline as value | PTransform to test side
 # effects (e.g. errors).
@@ -168,7 +168,7 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, int)
     def repeat(s, times):
       return s * times
-    p = beam.Pipeline(options=PipelineOptions([]))
+    p = TestPipeline()
     main_input = p | beam.Create(['a', 'bb', 'c'])
     side_input = p | 'side' >> beam.Create([3])
     result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
@@ -183,7 +183,7 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, typehints.Iterable[str])
     def concat(glue, items):
       return glue.join(sorted(items))
-    p = beam.Pipeline(options=PipelineOptions([]))
+    p = TestPipeline()
     main_input = p | beam.Create(['a', 'bb', 'c'])
     side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
     result = main_input | beam.Map(concat, pvalue.AsIter(side_input))