You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/23 04:22:51 UTC

[1/4] incubator-beam git commit: Remove the word 'Pipeline' from the PipelineRunner subclasses.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 82d7f0f77 -> 9c37274dd


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 012dde4..6c3c98e 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -131,7 +131,7 @@ class WindowTest(unittest.TestCase):
             | Map(lambda x: WindowedValue((key, x), x, [])))
 
   def test_sliding_windows(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
     result = (pcoll
               | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
@@ -144,7 +144,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_sessions(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
     result = (pcoll
               | 'w' >> WindowInto(Sessions(10))
@@ -157,7 +157,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_timestamped_value(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     result = (p
               | 'start' >> Create([(k, k) for k in range(10)])
               | Map(lambda (x, t): TimestampedValue(x, t))
@@ -169,7 +169,7 @@ class WindowTest(unittest.TestCase):
     p.run()
 
   def test_timestamped_with_combiners(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     result = (p
               # Create some initial test values.
               | 'start' >> Create([(k, k) for k in range(10)])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index 5ebb4b4..ff7da77 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -178,15 +178,15 @@ class PipelineOptions(HasDisplayData):
 
 class StandardOptions(PipelineOptions):
 
-  DEFAULT_RUNNER = 'DirectPipelineRunner'
+  DEFAULT_RUNNER = 'DirectRunner'
 
   @classmethod
   def _add_argparse_args(cls, parser):
     parser.add_argument(
         '--runner',
         help=('Pipeline runner used to execute the workflow. Valid values are '
-              'DirectPipelineRunner, DataflowPipelineRunner, '
-              'and BlockingDataflowPipelineRunner.'))
+              'DirectRunner, DataflowRunner, '
+              'and BlockingDataflowRunner.'))
     # Whether to enable streaming mode.
     parser.add_argument('--streaming',
                         default=False,
@@ -218,7 +218,7 @@ class TypeOptions(PipelineOptions):
                         action='store_true',
                         help='Enable type checking at pipeline execution '
                         'time. NOTE: only supported with the '
-                        'DirectPipelineRunner')
+                        'DirectRunner')
 
 
 class GoogleCloudOptions(PipelineOptions):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index ab42b65..c1243ce 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -105,8 +105,8 @@ class PipelineOptionsValidator(object):
     """True if pipeline will execute on the Google Cloud Dataflow service."""
     is_service_runner = (self.runner is not None and
                          type(self.runner).__name__ in [
-                             'DataflowPipelineRunner',
-                             'BlockingDataflowPipelineRunner',
+                             'DataflowRunner',
+                             'BlockingDataflowRunner',
                              'TestDataflowRunner'])
 
     dataflow_endpoint = (

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index 49028c7..8878c3f 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -29,7 +29,7 @@ from hamcrest.core.base_matcher import BaseMatcher
 # Mock runners to use for validations.
 class MockRunners(object):
 
-  class DataflowPipelineRunner(object):
+  class DataflowRunner(object):
     pass
 
   class TestDataflowRunner(object):
@@ -75,7 +75,7 @@ class SetupTest(unittest.TestCase):
 
   def test_missing_required_options(self):
     options = PipelineOptions([''])
-    runner = MockRunners.DataflowPipelineRunner()
+    runner = MockRunners.DataflowRunner()
     validator = PipelineOptionsValidator(options, runner)
     errors = validator.validate()
 
@@ -96,7 +96,7 @@ class SetupTest(unittest.TestCase):
         options.append('--staging_location=' + staging_location)
 
       pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowPipelineRunner()
+      runner = MockRunners.DataflowRunner()
       validator = PipelineOptionsValidator(pipeline_options, runner)
       return validator
 
@@ -151,7 +151,7 @@ class SetupTest(unittest.TestCase):
         options.append('--project=' + project)
 
       pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowPipelineRunner()
+      runner = MockRunners.DataflowRunner()
       validator = PipelineOptionsValidator(pipeline_options, runner)
       return validator
 
@@ -179,7 +179,7 @@ class SetupTest(unittest.TestCase):
         options.append('--job_name=' + job_name)
 
       pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowPipelineRunner()
+      runner = MockRunners.DataflowRunner()
       validator = PipelineOptionsValidator(pipeline_options, runner)
       return validator
 
@@ -207,7 +207,7 @@ class SetupTest(unittest.TestCase):
         options.append('--num_workers=' + num_workers)
 
       pipeline_options = PipelineOptions(options)
-      runner = MockRunners.DataflowPipelineRunner()
+      runner = MockRunners.DataflowRunner()
       validator = PipelineOptionsValidator(pipeline_options, runner)
       return validator
 
@@ -241,27 +241,27 @@ class SetupTest(unittest.TestCase):
             'expected': False,
         },
         {
-            'runner': MockRunners.DataflowPipelineRunner(),
+            'runner': MockRunners.DataflowRunner(),
             'options': ['--dataflow_endpoint=https://another.service.com'],
             'expected': False,
         },
         {
-            'runner': MockRunners.DataflowPipelineRunner(),
+            'runner': MockRunners.DataflowRunner(),
             'options': ['--dataflow_endpoint=https://another.service.com/'],
             'expected': False,
         },
         {
-            'runner': MockRunners.DataflowPipelineRunner(),
+            'runner': MockRunners.DataflowRunner(),
             'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
             'expected': True,
         },
         {
-            'runner': MockRunners.DataflowPipelineRunner(),
+            'runner': MockRunners.DataflowRunner(),
             'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
             'expected': True,
         },
         {
-            'runner': MockRunners.DataflowPipelineRunner(),
+            'runner': MockRunners.DataflowRunner(),
             'options': [],
             'expected': True,
         },

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/run_postcommit.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 1652936..968d4f9 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -73,7 +73,7 @@ SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz)
 # Run ValidatesRunner tests on Google Cloud Dataflow service
 python setup.py nosetests \
   -a ValidatesRunner --test-pipeline-options=" \
-    --runner=BlockingDataflowPipelineRunner \
+    --runner=BlockingDataflowRunner \
     --project=$PROJECT \
     --staging_location=$GCS_LOCATION/staging-validatesrunner-test \
     --temp_location=$GCS_LOCATION/temp-validatesrunner-test \
@@ -86,7 +86,7 @@ python -m apache_beam.examples.wordcount \
   --output $GCS_LOCATION/py-wordcount-cloud \
   --staging_location $GCS_LOCATION/staging-wordcount \
   --temp_location $GCS_LOCATION/temp-wordcount \
-  --runner BlockingDataflowPipelineRunner \
+  --runner BlockingDataflowRunner \
   --job_name $JOBNAME_E2E \
   --project $PROJECT \
   --sdk_location $SDK_LOCATION \


[3/4] incubator-beam git commit: Add a depracated warning about BlockingDataflowRunner.

Posted by ro...@apache.org.
Add a depracated warning about BlockingDataflowRunner.

Updated TestDataflowRunner to inherit from DataflowRunner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd5bf2cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd5bf2cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd5bf2cd

Branch: refs/heads/python-sdk
Commit: fd5bf2cd0dec86faffaa73ab861486c7044b26f4
Parents: 68dd9b5
Author: Ahmet Altay <al...@google.com>
Authored: Thu Dec 22 13:30:16 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Dec 22 20:22:27 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner.py                    | 7 ++++++-
 sdks/python/apache_beam/runners/test/test_dataflow_runner.py | 4 ++--
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd5bf2cd/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 3558d41..3dc4d28 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -30,7 +30,7 @@ _KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner')
 _KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner', 'BlockingDataflowRunner')
 _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
 _ALL_KNOWN_RUNNERS = (
-  _KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS + _KNOWN_TEST_RUNNERS)
+    _KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS + _KNOWN_TEST_RUNNERS)
 
 
 def create_runner(runner_name):
@@ -55,6 +55,11 @@ def create_runner(runner_name):
           '%s is deprecated, use %s instead.', runner_name, new_runner_name)
       runner_name = new_runner_name
 
+  # TODO(BEAM-759): Remove when all BlockingDataflowRunner references are gone.
+  if runner_name == 'BlockingDataflowRunner':
+    logging.warning(
+        'BlockingDataflowRunner is deprecated, use DataflowRunner instead.')
+
   if runner_name in _KNOWN_DIRECT_RUNNERS:
     runner_name = 'apache_beam.runners.direct.direct_runner.' + runner_name
   elif runner_name in _KNOWN_DATAFLOW_RUNNERS:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd5bf2cd/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
index aaf5eb8..a58ab33 100644
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -18,11 +18,11 @@
 """Wrapper of Beam runners that's built for running and verifying e2e tests."""
 
 from apache_beam.internal import pickler
-from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
+from apache_beam.runners.dataflow_runner import DataflowRunner
 from apache_beam.utils.options import TestOptions
 
 
-class TestDataflowRunner(DataflowPipelineRunner):
+class TestDataflowRunner(DataflowRunner):
 
   def __init__(self):
     super(TestDataflowRunner, self).__init__(blocking=True)


[2/4] incubator-beam git commit: Remove the word 'Pipeline' from the PipelineRunner subclasses.

Posted by ro...@apache.org.
Remove the word 'Pipeline' from the PipelineRunner subclasses.

The change is mostly mechnanical, except for the runners.py (and its
test). Temporarily added a way for not breaking existing users of this
runners. After a grace period after the new year, I will remove that
piece of code.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68dd9b5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68dd9b5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68dd9b5e

Branch: refs/heads/python-sdk
Commit: 68dd9b5e0518880f0329356cb2fd1dfa7e1a9dec
Parents: 82d7f0f
Author: Ahmet Altay <al...@google.com>
Authored: Wed Dec 21 19:02:09 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Dec 22 20:22:26 2016 -0800

----------------------------------------------------------------------
 sdks/python/README.md                           | 16 ++--
 .../examples/complete/autocomplete_test.py      |  2 +-
 .../examples/complete/estimate_pi_test.py       |  2 +-
 .../examples/complete/juliaset/juliaset_main.py |  2 +-
 .../apache_beam/examples/complete/tfidf_test.py |  2 +-
 .../examples/complete/top_wikipedia_sessions.py |  4 +-
 .../complete/top_wikipedia_sessions_test.py     |  2 +-
 .../cookbook/bigquery_side_input_test.py        |  2 +-
 .../cookbook/bigquery_tornadoes_test.py         |  2 +-
 .../examples/cookbook/coders_test.py            |  2 +-
 .../examples/cookbook/combiners_test.py         |  4 +-
 .../examples/cookbook/custom_ptransform_test.py |  2 +-
 .../examples/cookbook/filters_test.py           |  2 +-
 .../examples/cookbook/multiple_output_pardo.py  |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 10 +--
 .../examples/snippets/snippets_test.py          | 10 +--
 .../apache_beam/examples/wordcount_debugging.py |  2 +-
 .../apache_beam/examples/wordcount_minimal.py   |  4 +-
 .../apache_beam/internal/apiclient_test.py      |  4 +-
 sdks/python/apache_beam/io/avroio_test.py       | 10 +--
 .../python/apache_beam/io/concat_source_test.py |  2 +-
 .../apache_beam/io/filebasedsource_test.py      | 18 ++--
 sdks/python/apache_beam/io/fileio_test.py       | 10 +--
 sdks/python/apache_beam/io/sources_test.py      |  2 +-
 sdks/python/apache_beam/io/textio_test.py       | 24 +++---
 sdks/python/apache_beam/pipeline.py             |  2 +-
 sdks/python/apache_beam/pipeline_test.py        |  6 +-
 sdks/python/apache_beam/pvalue_test.py          |  4 +-
 sdks/python/apache_beam/runners/__init__.py     |  6 +-
 .../apache_beam/runners/dataflow_runner.py      | 14 +--
 .../apache_beam/runners/direct/__init__.py      |  2 +-
 .../consumer_tracking_pipeline_visitor_test.py  |  4 +-
 .../apache_beam/runners/direct/direct_runner.py | 10 +--
 .../runners/direct/evaluation_context.py        |  4 +-
 sdks/python/apache_beam/runners/runner.py       | 23 +++--
 sdks/python/apache_beam/runners/runner_test.py  | 37 +++++---
 .../apache_beam/runners/template_runner_test.py |  8 +-
 sdks/python/apache_beam/test_pipeline.py        |  2 +-
 .../apache_beam/transforms/aggregator_test.py   |  2 +-
 .../apache_beam/transforms/combiners_test.py    | 24 +++---
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   | 90 ++++++++++----------
 .../apache_beam/transforms/sideinputs_test.py   |  2 +-
 .../apache_beam/transforms/trigger_test.py      |  6 +-
 .../apache_beam/transforms/window_test.py       |  8 +-
 sdks/python/apache_beam/utils/options.py        |  8 +-
 .../utils/pipeline_options_validator.py         |  4 +-
 .../utils/pipeline_options_validator_test.py    | 22 ++---
 sdks/python/run_postcommit.sh                   |  4 +-
 49 files changed, 226 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/README.md
----------------------------------------------------------------------
diff --git a/sdks/python/README.md b/sdks/python/README.md
index 5ea2a60..af0fb5e 100644
--- a/sdks/python/README.md
+++ b/sdks/python/README.md
@@ -171,7 +171,7 @@ the `|` operator is used to chain them.
 # Standard imports
 import apache_beam as beam
 # Create a pipeline executing on a direct runner (local, non-cloud).
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 # Create a PCollection with names and write it to a file.
 (p
  | 'add names' >> beam.Create(['Ann', 'Joe'])
@@ -186,7 +186,7 @@ The `Map` `PTransform` returns one output per input. It takes a callable that is
 
 ```python
 import apache_beam as beam
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 # Read a file containing names, add a greeting to each name, and write to a file.
 (p
  | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
@@ -204,7 +204,7 @@ The `FlatMap` transform returns zero to many output per input. It accepts a call
 
 ```python
 import apache_beam as beam
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 # Read a file containing names, add two greetings to each name, and write to a file.
 (p
  | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
@@ -222,7 +222,7 @@ a function using `yield`.
 
 ```python
 import apache_beam as beam
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 # Read a file containing names, add two greetings to each name
 # (with FlatMap using a yield generator), and write to a file.
 def add_greetings(name, messages):
@@ -243,7 +243,7 @@ This example shows how to read a text file from [Google Cloud Storage](https://c
 ```python
 import re
 import apache_beam as beam
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 (p
  | 'read' >> beam.Read(
     beam.io.TextFileSource('gs://dataflow-samples/shakespeare/kinglear.txt'))
@@ -260,7 +260,7 @@ This is a somewhat forced example of `GroupByKey` to count words as the previous
 ```python
 import re
 import apache_beam as beam
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 class MyCountTransform(beam.PTransform):
   def expand(self, pcoll):
     return (pcoll
@@ -286,7 +286,7 @@ of the data encoding.
 ```python
 import apache_beam as beam
 from apache_beam.typehints import typehints
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 (p
  | 'read' >> beam.Read(beam.io.TextFileSource('./names'))
  | 'add types' >> beam.Map(lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
@@ -347,7 +347,7 @@ Combiner transforms use "reducing" functions, such as sum, min, or max, to combi
 
 ```python
 import apache_beam as beam
-p = beam.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectRunner')
 
 SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)]
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 0d20482..5ed4fb5 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -30,7 +30,7 @@ class AutocompleteTest(unittest.TestCase):
   WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
 
   def test_top_prefixes(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     words = p | beam.Create(self.WORDS)
     result = words | autocomplete.TopPerPrefix(5)
     # values must be hashable for now

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 3967ed5..0440ecc 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -38,7 +38,7 @@ def in_between(lower, upper):
 class EstimatePiTest(unittest.TestCase):
 
   def test_basics(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
 
     # Note: Probabilistically speaking this test can fail with a probability

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
index 45144cf..d6ba064 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
@@ -38,7 +38,7 @@ an example:
 python juliaset_main.py \
   --job_name juliaset-$USER \
   --project YOUR-PROJECT \
-  --runner BlockingDataflowPipelineRunner \
+  --runner BlockingDataflowRunner \
   --setup_file ./setup.py \
   --staging_location gs://YOUR-BUCKET/juliaset/staging \
   --temp_location gs://YOUR-BUCKET/juliaset/temp \

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index f30b832..deda4cb 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -47,7 +47,7 @@ class TfIdfTest(unittest.TestCase):
       f.write(contents)
 
   def test_tfidf_transform(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     uri_to_line = p | beam.Create(
         'create sample',
         [('1.txt', 'abc def ghi'),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 2d66d7f..fbce641 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -21,7 +21,7 @@ An example that reads Wikipedia edit data from Cloud Storage and computes the
 user with the longest string of edits separated by no more than an hour within
 each 30 day period.
 
-To execute this pipeline locally using the DirectPipelineRunner, specify an
+To execute this pipeline locally using the DirectRunner, specify an
 output prefix on GCS:
   --output gs://YOUR_OUTPUT_PREFIX
 
@@ -31,7 +31,7 @@ pipeline configuration in addition to the above:
   --project YOUR_PROJECT_ID
   --staging_location gs://YOUR_STAGING_DIRECTORY
   --temp_location gs://YOUR_TEMPORARY_DIRECTORY
-  --runner BlockingDataflowPipelineRunner
+  --runner BlockingDataflowRunner
 
 The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be
 overridden with --input.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index a84cc78..1d25807 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -49,7 +49,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
   ]
 
   def test_compute_top_sessions(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     edits = p | beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index e2b20f3..97c41d6 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -27,7 +27,7 @@ from apache_beam.examples.cookbook import bigquery_side_input
 class BigQuerySideInputTest(unittest.TestCase):
 
   def test_create_groups(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
 
     group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
     corpus_pcoll = p | beam.Create('create_corpus',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 87e1f44..2cb2c45 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -27,7 +27,7 @@ from apache_beam.examples.cookbook import bigquery_tornadoes
 class BigQueryTornadoesTest(unittest.TestCase):
 
   def test_basics(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     rows = (p | 'create' >> beam.Create([
         {'month': 1, 'day': 1, 'tornado': False},
         {'month': 1, 'day': 2, 'tornado': True},

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 75b78c8..ec0848f 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -34,7 +34,7 @@ class CodersTest(unittest.TestCase):
       {'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
 
   def test_compute_points(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
     result = (records
               | 'points' >> beam.FlatMap(coders.compute_points)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 56f5e78..18bd3bc 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -44,7 +44,7 @@ class CombinersTest(unittest.TestCase):
     can be used.
     """
     result = (
-        beam.Pipeline(runner=beam.runners.DirectPipelineRunner())
+        beam.Pipeline(runner=beam.runners.DirectRunner())
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(sum))
 
@@ -60,7 +60,7 @@ class CombinersTest(unittest.TestCase):
       return result
 
     result = (
-        beam.Pipeline(runner=beam.runners.DirectPipelineRunner())
+        beam.Pipeline(runner=beam.runners.DirectRunner())
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(multiply))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 806b031..91309ae 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -39,7 +39,7 @@ class CustomCountTest(unittest.TestCase):
     self.run_pipeline(custom_ptransform.Count3(factor), factor=factor)
 
   def run_pipeline(self, count_implementation, factor=1):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
     result = words | count_implementation
     assert_that(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 9e5592f..81dd30f 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -35,7 +35,7 @@ class FiltersTest(unittest.TestCase):
       ]
 
   def _get_result_for_month(self, month):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     rows = (p | 'create' >> beam.Create(self.input_data))
 
     results = filters.filter_cold_days(rows, month)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 167e709..d760e5a 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -40,7 +40,7 @@ pipeline configuration:
   --staging_location gs://YOUR_STAGING_DIRECTORY
   --temp_location gs://YOUR_TEMP_DIRECTORY
   --job_name YOUR_JOB_NAME
-  --runner BlockingDataflowPipelineRunner
+  --runner BlockingDataflowRunner
 
 and an output prefix on GCS:
   --output gs://YOUR_OUTPUT_PREFIX

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index f78ecd8..64878f3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -220,14 +220,14 @@ def pipeline_options_remote(argv):
   options = PipelineOptions(flags=argv)
 
   # For Cloud execution, set the Cloud Platform project, job_name,
-  # staging location, temp_location and specify DataflowPipelineRunner or
-  # BlockingDataflowPipelineRunner.
+  # staging location, temp_location and specify DataflowRunner or
+  # BlockingDataflowRunner.
   google_cloud_options = options.view_as(GoogleCloudOptions)
   google_cloud_options.project = 'my-project-id'
   google_cloud_options.job_name = 'myjob'
   google_cloud_options.staging_location = 'gs://my-bucket/binaries'
   google_cloud_options.temp_location = 'gs://my-bucket/temp'
-  options.view_as(StandardOptions).runner = 'DataflowPipelineRunner'
+  options.view_as(StandardOptions).runner = 'DataflowRunner'
 
   # Create the Pipeline with the specified options.
   p = Pipeline(options=options)
@@ -238,7 +238,7 @@ def pipeline_options_remote(argv):
   my_output = my_options.output
 
   # Overriding the runner for tests.
-  options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
+  options.view_as(StandardOptions).runner = 'DirectRunner'
   p = Pipeline(options=options)
 
   lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
@@ -436,7 +436,7 @@ def examples_wordcount_minimal(renames):
   google_cloud_options.job_name = 'myjob'
   google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
   google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
-  options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
+  options.view_as(StandardOptions).runner = 'BlockingDataflowRunner'
   # [END examples_wordcount_minimal_options]
 
   # Run it locally for testing.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index db2ea81..feb06c5 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -112,7 +112,7 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({1, 2, 4}, set(result))
 
   def test_pardo_side_input(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
 
     # [START model_pardo_side_input]
@@ -230,7 +230,7 @@ class ParDoTest(unittest.TestCase):
 class TypeHintsTest(unittest.TestCase):
 
   def test_bad_types(self):
-    p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv)
+    p = beam.Pipeline('DirectRunner', argv=sys.argv)
     evens = None  # pylint: disable=unused-variable
 
     # [START type_hints_missing_define_numbers]
@@ -292,7 +292,7 @@ class TypeHintsTest(unittest.TestCase):
 
   def test_runtime_checks_off(self):
     # pylint: disable=expression-not-assigned
-    p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv)
+    p = beam.Pipeline('DirectRunner', argv=sys.argv)
     # [START type_hints_runtime_off]
     p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
     p.run()
@@ -300,7 +300,7 @@ class TypeHintsTest(unittest.TestCase):
 
   def test_runtime_checks_on(self):
     # pylint: disable=expression-not-assigned
-    p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv)
+    p = beam.Pipeline('DirectRunner', argv=sys.argv)
     with self.assertRaises(typehints.TypeCheckError):
       # [START type_hints_runtime_on]
       p.options.view_as(TypeOptions).runtime_type_check = True
@@ -309,7 +309,7 @@ class TypeHintsTest(unittest.TestCase):
       # [END type_hints_runtime_on]
 
   def test_deterministic_key(self):
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     lines = (p | beam.Create(
         ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index cdf4e0c..ffbfed7 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -32,7 +32,7 @@ pipeline configuration::
   --staging_location gs://YOUR_STAGING_DIRECTORY
   --temp_location gs://YOUR_TEMP_DIRECTORY
   --job_name YOUR_JOB_NAME
-  --runner BlockingDataflowPipelineRunner
+  --runner BlockingDataflowRunner
 
 and an output prefix on GCS::
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 4073f7b..98477df 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -71,9 +71,9 @@ def run(argv=None):
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
   pipeline_args.extend([
-      # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowPipelineRunner to
+      # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowRunner to
       # run your pipeline on the Google Cloud Dataflow Service.
-      '--runner=DirectPipelineRunner',
+      '--runner=DirectRunner',
       # CHANGE 3/5: Your project ID is required in order to run your pipeline on
       # the Google Cloud Dataflow Service.
       '--project=SET_YOUR_PROJECT_ID_HERE',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 75d00e0..cda9445 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -20,7 +20,7 @@ import re
 import unittest
 
 from apache_beam.utils.options import PipelineOptions
-from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
+from apache_beam.runners.dataflow_runner import DataflowRunner
 from apache_beam.internal import apiclient
 
 
@@ -31,7 +31,7 @@ class UtilTest(unittest.TestCase):
     pipeline_options = PipelineOptions()
     apiclient.DataflowApplicationClient(
         pipeline_options,
-        DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
+        DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
 
   def test_default_job_name(self):
     job_name = apiclient.Job.default_job_name(None)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index e5db196..db940e3 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -325,16 +325,16 @@ class TestAvro(unittest.TestCase):
 
   def test_source_transform(self):
     path = self._write_data()
-    with beam.Pipeline('DirectPipelineRunner') as p:
+    with beam.Pipeline('DirectRunner') as p:
       assert_that(p | avroio.ReadFromAvro(path), equal_to(self.RECORDS))
 
   def test_sink_transform(self):
     with tempfile.NamedTemporaryFile() as dst:
       path = dst.name
-      with beam.Pipeline('DirectPipelineRunner') as p:
+      with beam.Pipeline('DirectRunner') as p:
         # pylint: disable=expression-not-assigned
         p | beam.Create(self.RECORDS) | avroio.WriteToAvro(path, self.SCHEMA)
-      with beam.Pipeline('DirectPipelineRunner') as p:
+      with beam.Pipeline('DirectRunner') as p:
         # json used for stable sortability
         readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
         assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
@@ -344,13 +344,13 @@ class TestAvro(unittest.TestCase):
       import snappy  # pylint: disable=unused-variable
       with tempfile.NamedTemporaryFile() as dst:
         path = dst.name
-        with beam.Pipeline('DirectPipelineRunner') as p:
+        with beam.Pipeline('DirectRunner') as p:
           # pylint: disable=expression-not-assigned
           p | beam.Create(self.RECORDS) | avroio.WriteToAvro(
               path,
               self.SCHEMA,
               codec='snappy')
-        with beam.Pipeline('DirectPipelineRunner') as p:
+        with beam.Pipeline('DirectRunner') as p:
           # json used for stable sortability
           readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
           assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index e4df472..3ff2529 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -212,7 +212,7 @@ class ConcatSourceTest(unittest.TestCase):
                            RangeSource(10, 100),
                            RangeSource(100, 1000),
                           ])
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.Read(source)
     assert_that(pcoll, equal_to(range(1000)))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index ab7c69f..c93dc5a 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -364,7 +364,7 @@ class TestFileBasedSource(unittest.TestCase):
     self.assertItemsEqual(expected_data, read_data)
 
   def _run_dataflow_test(self, pattern, expected_data, splittable=True):
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         pattern, splittable=splittable))
     assert_that(pcoll, equal_to(expected_data))
@@ -404,7 +404,7 @@ class TestFileBasedSource(unittest.TestCase):
     with bz2.BZ2File(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
@@ -419,7 +419,7 @@ class TestFileBasedSource(unittest.TestCase):
     with gzip.GzipFile(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
@@ -437,7 +437,7 @@ class TestFileBasedSource(unittest.TestCase):
       compressed_chunks.append(
           compressobj.compress('\n'.join(c)) + compressobj.flush())
     file_pattern = write_prepared_pattern(compressed_chunks)
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         splittable=False,
@@ -456,7 +456,7 @@ class TestFileBasedSource(unittest.TestCase):
         f.write('\n'.join(c))
       compressed_chunks.append(out.getvalue())
     file_pattern = write_prepared_pattern(compressed_chunks)
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         splittable=False,
@@ -471,7 +471,7 @@ class TestFileBasedSource(unittest.TestCase):
     with bz2.BZ2File(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         compression_type=fileio.CompressionTypes.AUTO))
@@ -485,7 +485,7 @@ class TestFileBasedSource(unittest.TestCase):
     with gzip.GzipFile(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         compression_type=fileio.CompressionTypes.AUTO))
@@ -504,7 +504,7 @@ class TestFileBasedSource(unittest.TestCase):
       compressed_chunks.append(out.getvalue())
     file_pattern = write_prepared_pattern(
         compressed_chunks, suffixes=['.gz']*len(chunks))
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         compression_type=fileio.CompressionTypes.AUTO))
@@ -526,7 +526,7 @@ class TestFileBasedSource(unittest.TestCase):
         chunks_to_write.append('\n'.join(c))
     file_pattern = write_prepared_pattern(chunks_to_write,
                                           suffixes=(['.gz', '']*3))
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         compression_type=fileio.CompressionTypes.AUTO))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index dbf0e69..35581eb 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -760,7 +760,7 @@ class TestNativeTextFileSink(unittest.TestCase):
       self.assertEqual(f.read().splitlines(), [])
 
   def test_write_dataflow(self):
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path))  # pylint: disable=expression-not-assigned
     pipeline.run()
@@ -773,7 +773,7 @@ class TestNativeTextFileSink(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_dataflow_auto_compression(self):
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
@@ -788,7 +788,7 @@ class TestNativeTextFileSink(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_dataflow_auto_compression_unsharded(self):
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
@@ -880,7 +880,7 @@ class TestFileSink(unittest.TestCase):
     temp_path = tempfile.NamedTemporaryFile().name
     sink = MyFileSink(
         temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
     p.run()
     self.assertEqual(
@@ -894,7 +894,7 @@ class TestFileSink(unittest.TestCase):
         num_shards=3,
         shard_name_template='_NN_SSS_',
         coder=coders.ToStringCoder())
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
 
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 35a13f4..f0f2046 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -98,7 +98,7 @@ class SourcesTest(unittest.TestCase):
 
   def test_run_direct(self):
     file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.Read(LineSource(file_name))
     assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd']))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 5784fef..39ddec4 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -264,7 +264,7 @@ class TextSourceTest(unittest.TestCase):
   def test_dataflow_single_file(self):
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(file_name)
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
@@ -279,7 +279,7 @@ class TextSourceTest(unittest.TestCase):
 
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(file_name, coder=DummyCoder())
     assert_that(pcoll, equal_to([record * 2 for record in expected_data]))
     pipeline.run()
@@ -287,7 +287,7 @@ class TextSourceTest(unittest.TestCase):
   def test_dataflow_file_pattern(self):
     pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4])
     assert len(expected_data) == 40
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(pattern)
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
@@ -299,7 +299,7 @@ class TextSourceTest(unittest.TestCase):
     with bz2.BZ2File(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(file_name)
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
@@ -311,7 +311,7 @@ class TextSourceTest(unittest.TestCase):
     with gzip.GzipFile(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(file_name)
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
@@ -323,7 +323,7 @@ class TextSourceTest(unittest.TestCase):
     with bz2.BZ2File(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(
         file_name,
         compression_type=CompressionTypes.BZIP2)
@@ -337,7 +337,7 @@ class TextSourceTest(unittest.TestCase):
     with gzip.GzipFile(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(
         file_name,
         0, CompressionTypes.GZIP,
@@ -352,7 +352,7 @@ class TextSourceTest(unittest.TestCase):
     with gzip.GzipFile(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(
         file_name,
         0, CompressionTypes.GZIP,
@@ -385,7 +385,7 @@ class TextSourceTest(unittest.TestCase):
   def test_read_gzip_empty_file(self):
     filename = tempfile.NamedTemporaryFile(
         delete=False, prefix=tempfile.template).name
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | 'Read' >> ReadFromText(
         filename,
         0, CompressionTypes.GZIP,
@@ -481,7 +481,7 @@ class TextSinkTest(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_write_dataflow(self):
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> WriteToText(self.path)  # pylint: disable=expression-not-assigned
     pipeline.run()
@@ -494,7 +494,7 @@ class TextSinkTest(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_dataflow_auto_compression(self):
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz')  # pylint: disable=expression-not-assigned
     pipeline.run()
@@ -507,7 +507,7 @@ class TextSinkTest(unittest.TestCase):
     self.assertEqual(read_result, self.lines)
 
   def test_write_dataflow_auto_compression_unsharded(self):
-    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pipeline = beam.Pipeline('DirectRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)
     pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='')  # pylint: disable=expression-not-assigned
     pipeline.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index b9e4daf..8b2345e 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -28,7 +28,7 @@ to be executed for each node visited is specified through a runner object.
 Typical usage:
 
   # Create a pipeline object using a local runner for execution.
-  pipeline = Pipeline(runner=DirectPipelineRunner())
+  pipeline = Pipeline(runner=DirectRunner())
 
   # Add to the pipeline a "Create" transform. When executed this
   # transform will produce a PCollection object with the specified values.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 5af4811..9b76129 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -61,7 +61,7 @@ class FakeSource(NativeSource):
 class PipelineTest(unittest.TestCase):
 
   def setUp(self):
-    self.runner_name = 'DirectPipelineRunner'
+    self.runner_name = 'DirectRunner'
 
   @staticmethod
   def custom_callable(pcoll):
@@ -202,7 +202,7 @@ class PipelineTest(unittest.TestCase):
     num_elements = 10
     num_maps = 100
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
 
     # Consumed memory should not be proportional to the number of maps.
     memory_threshold = (
@@ -231,7 +231,7 @@ class PipelineTest(unittest.TestCase):
         p | Create([ValueError]) | Map(raise_exception)
 
   def test_eager_pipeline(self):
-    p = Pipeline('EagerPipelineRunner')
+    p = Pipeline('EagerRunner')
     self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index 323ca33..6748f17 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -39,12 +39,12 @@ class FakePipeline(Pipeline):
 class PValueTest(unittest.TestCase):
 
   def test_pvalue_expected_arguments(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     value = PValue(pipeline)
     self.assertEqual(pipeline, value.pipeline)
 
   def test_pcollectionview_not_recreated(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     value = pipeline | 'create1' >> Create([1, 2, 3])
     value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
     value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
index 0c2e9c5..4f34a3e 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -20,9 +20,9 @@
 This package defines runners, which are used to execute a pipeline.
 """
 
-from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
-from apache_beam.runners.direct.direct_runner import DirectPipelineRunner
-from apache_beam.runners.direct.direct_runner import EagerPipelineRunner
+from apache_beam.runners.dataflow_runner import DataflowRunner
+from apache_beam.runners.direct.direct_runner import DirectRunner
+from apache_beam.runners.direct.direct_runner import EagerRunner
 from apache_beam.runners.runner import create_runner
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index faefd5e..392a166 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -45,11 +45,11 @@ from apache_beam.utils.options import StandardOptions
 from apache_beam.internal.clients import dataflow as dataflow_api
 
 
-def BlockingDataflowPipelineRunner(*args, **kwargs):
-  return DataflowPipelineRunner(*args, blocking=True, **kwargs)
+def BlockingDataflowRunner(*args, **kwargs):
+  return DataflowRunner(*args, blocking=True, **kwargs)
 
 
-class DataflowPipelineRunner(PipelineRunner):
+class DataflowRunner(PipelineRunner):
   """A runner that creates job graphs and submits them for remote execution.
 
   Every execution of the run() method will submit an independent job for
@@ -162,13 +162,13 @@ class DataflowPipelineRunner(PipelineRunner):
     self.job = apiclient.Job(pipeline.options)
 
     # The superclass's run will trigger a traversal of all reachable nodes.
-    super(DataflowPipelineRunner, self).run(pipeline)
+    super(DataflowRunner, self).run(pipeline)
 
     standard_options = pipeline.options.view_as(StandardOptions)
     if standard_options.streaming:
-      job_version = DataflowPipelineRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
+      job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
     else:
-      job_version = DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
+      job_version = DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
 
     # Get a Dataflow API client and set its options
     self.dataflow_client = apiclient.DataflowApplicationClient(
@@ -180,7 +180,7 @@ class DataflowPipelineRunner(PipelineRunner):
 
     if self.result.has_job and self.blocking:
       thread = threading.Thread(
-          target=DataflowPipelineRunner.poll_for_job_completion,
+          target=DataflowRunner.poll_for_job_completion,
           args=(self, self.result.job_id()))
       # Mark the thread as a daemon thread so a keyboard interrupt on the main
       # thread will terminate everything. This is also the reason we will not

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/direct/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/__init__.py b/sdks/python/apache_beam/runners/direct/__init__.py
index 067f35a..0d64513 100644
--- a/sdks/python/apache_beam/runners/direct/__init__.py
+++ b/sdks/python/apache_beam/runners/direct/__init__.py
@@ -16,4 +16,4 @@
 #
 
 """Inprocess runner executes pipelines locally in a single process."""
-from apache_beam.runners.direct.direct_runner import DirectPipelineRunner
+from apache_beam.runners.direct.direct_runner import DirectRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index b87f7b1..6fc9d83 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -25,7 +25,7 @@ from apache_beam.io import Read
 from apache_beam.io import TextFileSource
 from apache_beam.pipeline import Pipeline
 from apache_beam.pvalue import AsList
-from apache_beam.runners.direct import DirectPipelineRunner
+from apache_beam.runners.direct import DirectRunner
 from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
 from apache_beam.transforms import CoGroupByKey
 from apache_beam.transforms import Create
@@ -42,7 +42,7 @@ from apache_beam.transforms import ParDo
 class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
 
   def setUp(self):
-    self.pipeline = Pipeline(DirectPipelineRunner())
+    self.pipeline = Pipeline(DirectRunner())
     self.visitor = ConsumerTrackingPipelineVisitor()
 
   def test_root_transforms(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index fa78902..523eb05 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -15,9 +15,9 @@
 # limitations under the License.
 #
 
-"""DirectPipelineRunner, executing on the local machine.
+"""DirectRunner, executing on the local machine.
 
-The DirectPipelineRunner is a runner implementation that executes the entire
+The DirectRunner is a runner implementation that executes the entire
 graph of transformations belonging to a pipeline on the local machine.
 """
 
@@ -33,7 +33,7 @@ from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
 
 
-class DirectPipelineRunner(PipelineRunner):
+class DirectRunner(PipelineRunner):
   """Executes a single pipeline on the local machine."""
 
   def __init__(self):
@@ -52,7 +52,7 @@ class DirectPipelineRunner(PipelineRunner):
     from apache_beam.runners.direct.transform_evaluator import \
       TransformEvaluatorRegistry
 
-    logging.info('Running pipeline with DirectPipelineRunner.')
+    logging.info('Running pipeline with DirectRunner.')
     self.visitor = ConsumerTrackingPipelineVisitor()
     pipeline.visit(self.visitor)
 
@@ -153,6 +153,6 @@ class DirectPipelineResult(PipelineResult):
     return self._evaluation_context.get_aggregator_values(aggregator_or_name)
 
 
-class EagerPipelineRunner(DirectPipelineRunner):
+class EagerRunner(DirectRunner):
 
   is_eager = True

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 660574b..48d353b 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -108,11 +108,11 @@ class EvaluationContext(object):
   """Evaluation context with the global state information of the pipeline.
 
   The evaluation context for a specific pipeline being executed by the
-  DirectPipelineRunner. Contains state shared within the execution across all
+  DirectRunner. Contains state shared within the execution across all
   transforms.
 
   EvaluationContext contains shared state for an execution of the
-  DirectPipelineRunner that can be used while evaluating a PTransform. This
+  DirectRunner that can be used while evaluating a PTransform. This
   consists of views into underlying state and watermark implementations, access
   to read and write PCollectionViews, and constructing counter sets and
   execution contexts. This includes executing callbacks asynchronously when

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index f138260..3558d41 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -26,10 +26,11 @@ import shutil
 import tempfile
 
 
-_KNOWN_DIRECT_RUNNERS = ('DirectPipelineRunner', 'EagerPipelineRunner')
-_KNOWN_DATAFLOW_RUNNERS = ('DataflowPipelineRunner',
-                           'BlockingDataflowPipelineRunner')
+_KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner')
+_KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner', 'BlockingDataflowRunner')
 _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
+_ALL_KNOWN_RUNNERS = (
+  _KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS + _KNOWN_TEST_RUNNERS)
 
 
 def create_runner(runner_name):
@@ -37,8 +38,7 @@ def create_runner(runner_name):
 
   Args:
     runner_name: Name of the pipeline runner. Possible values are:
-      DirectPipelineRunner, DataflowPipelineRunner,
-      BlockingDataflowPipelineRunner and TestDataflowRunner.
+      DirectRunner, DataflowRunner and TestDataflowRunner.
 
   Returns:
     A runner object.
@@ -46,6 +46,15 @@ def create_runner(runner_name):
   Raises:
     RuntimeError: if an invalid runner name is used.
   """
+
+  # TODO(BEAM-1185): Remove when all references to PipelineRunners are gone.
+  if 'PipelineRunner' in runner_name:
+    new_runner_name = runner_name.replace('PipelineRunner', 'Runner')
+    if new_runner_name in _ALL_KNOWN_RUNNERS:
+      logging.warning(
+          '%s is deprecated, use %s instead.', runner_name, new_runner_name)
+      runner_name = new_runner_name
+
   if runner_name in _KNOWN_DIRECT_RUNNERS:
     runner_name = 'apache_beam.runners.direct.direct_runner.' + runner_name
   elif runner_name in _KNOWN_DATAFLOW_RUNNERS:
@@ -60,9 +69,7 @@ def create_runner(runner_name):
     raise ValueError(
         'Unexpected pipeline runner: %s. Valid values are %s '
         'or the fully qualified name of a PipelineRunner subclass.' % (
-            runner_name,
-            ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS +
-                      _KNOWN_TEST_RUNNERS)))
+            runner_name, ', '.join(_ALL_KNOWN_RUNNERS)))
 
 
 class PipelineRunner(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index cc9450e..74c81c2 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -15,9 +15,9 @@
 # limitations under the License.
 #
 
-"""Unit tests for the PipelineRunner and DirectPipelineRunner classes.
+"""Unit tests for the PipelineRunner and DirectRunner classes.
 
-Note that PipelineRunner and DirectPipelineRunner functionality is tested in all
+Note that PipelineRunner and DirectRunner functionality is tested in all
 the other unit tests. In this file we choose to test only aspects related to
 caching and clearing values that are not tested elsewhere.
 """
@@ -31,8 +31,8 @@ import apache_beam as beam
 from apache_beam.internal import apiclient
 from apache_beam.pipeline import Pipeline
 from apache_beam.runners import create_runner
-from apache_beam.runners import DataflowPipelineRunner
-from apache_beam.runners import DirectPipelineRunner
+from apache_beam.runners import DataflowRunner
+from apache_beam.runners import DirectRunner
 from apache_beam.runners import TestDataflowRunner
 import apache_beam.transforms as ptransform
 from apache_beam.transforms.display import DisplayDataItem
@@ -50,20 +50,29 @@ class RunnerTest(unittest.TestCase):
 
   def test_create_runner(self):
     self.assertTrue(
-        isinstance(create_runner('DirectPipelineRunner'), DirectPipelineRunner))
+        isinstance(create_runner('DirectRunner'), DirectRunner))
     self.assertTrue(
-        isinstance(create_runner('DataflowPipelineRunner'),
-                   DataflowPipelineRunner))
+        isinstance(create_runner('DataflowRunner'),
+                   DataflowRunner))
     self.assertTrue(
-        isinstance(create_runner('BlockingDataflowPipelineRunner'),
-                   DataflowPipelineRunner))
+        isinstance(create_runner('BlockingDataflowRunner'),
+                   DataflowRunner))
     self.assertTrue(
         isinstance(create_runner('TestDataflowRunner'),
                    TestDataflowRunner))
     self.assertRaises(ValueError, create_runner, 'xyz')
+    # TODO(BEAM-1185): Remove when all references to PipelineRunners are gone.
+    self.assertTrue(
+        isinstance(create_runner('DirectPipelineRunner'), DirectRunner))
+    self.assertTrue(
+        isinstance(create_runner('DataflowPipelineRunner'),
+                   DataflowRunner))
+    self.assertTrue(
+        isinstance(create_runner('BlockingDataflowPipelineRunner'),
+                   DataflowRunner))
 
   def test_remote_runner_translation(self):
-    remote_runner = DataflowPipelineRunner()
+    remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,
                  options=PipelineOptions(self.default_properties))
 
@@ -71,10 +80,10 @@ class RunnerTest(unittest.TestCase):
      | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
      | 'gbk' >> ptransform.GroupByKey())
     remote_runner.job = apiclient.Job(p.options)
-    super(DataflowPipelineRunner, remote_runner).run(p)
+    super(DataflowRunner, remote_runner).run(p)
 
   def test_remote_runner_display_data(self):
-    remote_runner = DataflowPipelineRunner()
+    remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,
                  options=PipelineOptions(self.default_properties))
 
@@ -105,7 +114,7 @@ class RunnerTest(unittest.TestCase):
      | 'do' >> SpecialParDo(SpecialDoFn(), now))
 
     remote_runner.job = apiclient.Job(p.options)
-    super(DataflowPipelineRunner, remote_runner).run(p)
+    super(DataflowRunner, remote_runner).run(p)
     job_dict = json.loads(str(remote_runner.job))
     steps = [step
              for step in job_dict['steps']
@@ -127,7 +136,7 @@ class RunnerTest(unittest.TestCase):
     self.assertEqual(disp_data, expected_data)
 
   def test_no_group_by_key_directly_after_bigquery(self):
-    remote_runner = DataflowPipelineRunner()
+    remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,
                  options=PipelineOptions([
                      '--dataflow_endpoint=ignored',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/runners/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py
index cc3d7c2..8cd818b 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -25,12 +25,12 @@ import tempfile
 
 import apache_beam as beam
 from apache_beam.pipeline import Pipeline
-from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
+from apache_beam.runners.dataflow_runner import DataflowRunner
 from apache_beam.utils.options import PipelineOptions
 from apache_beam.internal import apiclient
 
 
-class TemplatingDataflowPipelineRunnerTest(unittest.TestCase):
+class TemplatingDataflowRunnerTest(unittest.TestCase):
   """TemplatingDataflow tests."""
   def test_full_completion(self):
     # Create dummy file and close it.  Note that we need to do this because
@@ -42,7 +42,7 @@ class TemplatingDataflowPipelineRunnerTest(unittest.TestCase):
 
     dummy_dir = tempfile.mkdtemp()
 
-    remote_runner = DataflowPipelineRunner()
+    remote_runner = DataflowRunner()
     pipeline = Pipeline(remote_runner,
                         options=PipelineOptions([
                             '--dataflow_endpoint=ignored',
@@ -67,7 +67,7 @@ class TemplatingDataflowPipelineRunnerTest(unittest.TestCase):
 
   def test_bad_path(self):
     dummy_sdk_file = tempfile.NamedTemporaryFile()
-    remote_runner = DataflowPipelineRunner()
+    remote_runner = DataflowRunner()
     pipeline = Pipeline(remote_runner,
                         options=PipelineOptions([
                             '--dataflow_endpoint=ignored',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index 5794380..5f67113 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -41,7 +41,7 @@ class TestPipeline(Pipeline):
   For example, use following command line to execute all ValidatesRunner tests::
 
     python setup.py nosetests -a ValidatesRunner \
-        --test-pipeline-options="--runner=DirectPipelineRunner \
+        --test-pipeline-options="--runner=DirectRunner \
                                  --job_name=myJobName \
                                  --num_workers=1"
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py
index 040819e..e77dfba 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -63,7 +63,7 @@ class AggregatorTest(unittest.TestCase):
         for a in aggregators:
           context.aggregate_to(a, context.element)
 
-    p = beam.Pipeline('DirectPipelineRunner')
+    p = beam.Pipeline('DirectRunner')
     p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators())  # pylint: disable=expression-not-assigned
     res = p.run()
     for (_, _, expected), a in zip(counter_types, aggregators):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 6113ea2..72fce60 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -40,7 +40,7 @@ class CombineTest(unittest.TestCase):
     combine.TopCombineFn._MIN_BUFFER_OVERSIZE = 1
 
   def test_builtin_combines(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
 
     vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
     mean = sum(vals) / float(len(vals))
@@ -62,7 +62,7 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_top(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
 
     # A parameter we'll be sharing with a custom comparator.
     names = {0: 'zo',
@@ -201,7 +201,7 @@ class CombineTest(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_top_shorthands(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
 
     pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
     result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
@@ -222,7 +222,7 @@ class CombineTest(unittest.TestCase):
 
     # First test global samples (lots of them).
     for ix in xrange(300):
-      pipeline = Pipeline('DirectPipelineRunner')
+      pipeline = Pipeline('DirectRunner')
       pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
       result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
 
@@ -241,7 +241,7 @@ class CombineTest(unittest.TestCase):
       pipeline.run()
 
     # Now test per-key samples.
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start-perkey' >> Create(
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
     result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
@@ -258,7 +258,7 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_tuple_combine_fn(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     result = (
         p
         | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
@@ -269,7 +269,7 @@ class CombineTest(unittest.TestCase):
     p.run()
 
   def test_tuple_combine_fn_without_defaults(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     result = (
         p
         | Create([1, 1, 2, 3])
@@ -280,7 +280,7 @@ class CombineTest(unittest.TestCase):
     p.run()
 
   def test_to_list_and_to_dict(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
     pcoll = pipeline | 'start' >> Create(the_list)
     result = pcoll | 'to list' >> combine.ToList()
@@ -292,7 +292,7 @@ class CombineTest(unittest.TestCase):
     assert_that(result, matcher([the_list]))
     pipeline.run()
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pairs = [(1, 2), (3, 4), (5, 6)]
     pcoll = pipeline | 'start-pairs' >> Create(pairs)
     result = pcoll | 'to dict' >> combine.ToDict()
@@ -306,12 +306,12 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_combine_globally_with_default(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
     p.run()
 
   def test_combine_globally_without_default(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     result = p | Create([]) | CombineGlobally(sum).without_defaults()
     assert_that(result, equal_to([]))
     p.run()
@@ -323,7 +323,7 @@ class CombineTest(unittest.TestCase):
         main = pcoll.pipeline | Create([None])
         return main | Map(lambda _, s: s, side)
 
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
     result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
     assert_that(result1, equal_to([0]), label='r1')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 1bd7fb4..39864a6 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -413,7 +413,7 @@ class PTransform(WithTypeHints, HasDisplayData):
       from apache_beam.utils.options import PipelineOptions
       # pylint: enable=wrong-import-order, wrong-import-position
       p = pipeline.Pipeline(
-          'DirectPipelineRunner', PipelineOptions(sys.argv))
+          'DirectRunner', PipelineOptions(sys.argv))
     else:
       if not pipelines:
         if self.pipeline is not None:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 9118fee..5ed7d72 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -54,12 +54,12 @@ class PTransformTest(unittest.TestCase):
     self.assertEqual('<PTransform(PTransform) label=[PTransform]>',
                      str(PTransform()))
 
-    pa = Pipeline('DirectPipelineRunner')
+    pa = Pipeline('DirectRunner')
     res = pa | 'a_label' >> beam.Create([1, 2])
     self.assertEqual('AppliedPTransform(a_label, Create)',
                      str(res.producer))
 
-    pc = Pipeline('DirectPipelineRunner')
+    pc = Pipeline('DirectRunner')
     res = pc | beam.Create([1, 2])
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
@@ -67,7 +67,7 @@ class PTransformTest(unittest.TestCase):
         """<Create(PTransform) label=[Create] inputs=('ci',)>""",
         str(inputs_tr))
 
-    pd = Pipeline('DirectPipelineRunner')
+    pd = Pipeline('DirectRunner')
     res = pd | beam.Create([1, 2])
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
@@ -111,7 +111,7 @@ class PTransformTest(unittest.TestCase):
       def process(self, context, addon):
         return [context.element + addon]
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
     assert_that(result, equal_to([11, 12, 13]))
@@ -123,20 +123,20 @@ class PTransformTest(unittest.TestCase):
       def process(self, context):
         pass
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     with self.assertRaises(ValueError):
       pcoll | 'do' >> beam.ParDo(MyDoFn)  # Note the lack of ()'s
 
   def test_do_with_callable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_side_input_as_arg(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     side = pipeline | 'side' >> beam.Create([10])
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
@@ -145,7 +145,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_do_with_side_input_as_keyword_arg(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     side = pipeline | 'side' >> beam.Create([10])
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
@@ -154,7 +154,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_do_with_do_fn_returning_string_raises_warning(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
     pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
 
@@ -168,7 +168,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_with_do_fn_returning_dict_raises_warning(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
     pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
 
@@ -182,7 +182,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_with_side_outputs_maintains_unique_name(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
     r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
@@ -195,7 +195,7 @@ class PTransformTest(unittest.TestCase):
     # iterable.
     def incorrect_par_do_fn(x):
       return x + 5
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
     pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
     # It's a requirement that all user-defined functions to a ParDo return
@@ -216,7 +216,7 @@ class PTransformTest(unittest.TestCase):
 
       def finish_bundle(self, c):
         yield 'finish'
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | 'do' >> beam.ParDo(MyDoFn())
 
@@ -231,7 +231,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_filter(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
     result = pcoll | beam.Filter(
         'filter', lambda x: x % 2 == 0)
@@ -257,7 +257,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_combine_fn(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
     assert_that(result, equal_to([sum(vals) / len(vals)]))
@@ -265,7 +265,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_callable(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | beam.CombineGlobally(sum)
     assert_that(result, equal_to([sum(vals)]))
@@ -273,7 +273,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_combine_with_side_input_as_arg(self):
     values = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(values)
     divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombineGlobally(
@@ -288,7 +288,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_combine_fn(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
@@ -299,7 +299,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_callable(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
@@ -309,7 +309,7 @@ class PTransformTest(unittest.TestCase):
   def test_combine_per_key_with_side_input_as_arg(self):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     divisor = pipeline | 'divisor' >> beam.Create([2])
@@ -322,7 +322,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_group_by_key(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | beam.Create(
         'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
     result = pcoll | 'group' >> beam.GroupByKey()
@@ -336,7 +336,7 @@ class PTransformTest(unittest.TestCase):
       def partition_for(self, context, num_partitions, offset):
         return (context.element % 3) + offset
 
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     # Attempt nominal partition operation.
     partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
@@ -347,15 +347,15 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
     # Check that a bad partition label will yield an error. For the
-    # DirectPipelineRunner, this error manifests as an exception.
-    pipeline = Pipeline('DirectPipelineRunner')
+    # DirectRunner, this error manifests as an exception.
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_partition_with_callable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = (
         pcoll | beam.Partition(
@@ -370,7 +370,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_partition_followed_by_flatten_and_groupbykey(self):
     """Regression test for an issue with how partitions are handled."""
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     contents = [('aa', 1), ('bb', 2), ('aa', 2)]
     created = pipeline | 'A' >> beam.Create(contents)
     partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
@@ -380,7 +380,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_pcollections(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
     pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
@@ -388,7 +388,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_no_pcollections(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     with self.assertRaises(ValueError):
       () | 'pipeline arg missing' >> beam.Flatten()
     result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
@@ -396,7 +396,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_flatten_pcollections_in_iterable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
     pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
@@ -417,7 +417,7 @@ class PTransformTest(unittest.TestCase):
       set([1, 2, 3]) | 'flatten' >> beam.Flatten()
 
   def test_co_group_by_key_on_list(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -429,7 +429,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_co_group_by_key_on_iterable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -442,7 +442,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_co_group_by_key_on_dict(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll_1 = pipeline | beam.Create(
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
@@ -454,7 +454,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_group_by_key_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5])
 
     with self.assertRaises(typehints.TypeCheckError) as e:
@@ -467,7 +467,7 @@ class PTransformTest(unittest.TestCase):
         'Tuple[TypeVariable[K], TypeVariable[V]]')
 
   def test_group_by_key_only_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
       pcolls | 'D' >> beam.GroupByKeyOnly()
@@ -478,7 +478,7 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_keys_and_values(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | beam.Create(
         'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
     keys = pcoll.apply('keys', beam.Keys())
@@ -488,7 +488,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_kv_swap(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | beam.Create(
         'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
     result = pcoll.apply('swap', beam.KvSwap())
@@ -496,7 +496,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_remove_duplicates(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | beam.Create(
         'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
     result = pcoll.apply('nodupes', beam.RemoveDuplicates())
@@ -504,7 +504,7 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   def test_chained_ptransforms(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     t = (beam.Map(lambda x: (x, 1))
          | beam.GroupByKey()
          | beam.Map(lambda (x, ones): (x, sum(ones))))
@@ -581,7 +581,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_chained_ptransforms(self):
     """Tests that chaining gets proper nesting."""
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     map1 = beam.Map('map1', lambda x: (x, 1))
     gbk = beam.GroupByKey('gbk')
     map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
@@ -594,7 +594,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_custom_transform_without_label(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform()
     result = pipeline.apply(custom, pcoll)
@@ -604,7 +604,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_custom_transform_with_label(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform('*custom*')
     result = pipeline.apply(custom, pcoll)
@@ -615,7 +615,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_combine_without_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally(sum)
     result = pcoll | combine
@@ -624,7 +624,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def test_apply_ptransform_using_decorator(self):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     sample = SamplePTransform('*sample*')
     _ = pcoll | sample
@@ -635,7 +635,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_combine_with_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally('*sum*', sum)
     result = pcoll | combine
@@ -644,7 +644,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline.run()
 
   def check_label(self, ptransform, expected_label):
-    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline = Pipeline('DirectRunner')
     pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
     actual_label = sorted(pipeline.applied_labels - {'start'})[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index caf3652..a327dc8 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -28,7 +28,7 @@ from apache_beam.transforms.util import assert_that, equal_to
 class SideInputsTest(unittest.TestCase):
 
   def create_pipeline(self):
-    return beam.Pipeline('DirectPipelineRunner')
+    return beam.Pipeline('DirectRunner')
 
   def run_windowed_side_inputs(self, elements, main_window_fn,
                                side_window_fn=None,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68dd9b5e/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index c37d4ae..fd34f4c 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -379,10 +379,11 @@ class TriggerTest(unittest.TestCase):
       self.assertEqual(pickle.loads(pickle.dumps(unwindowed)).value,
                        range(10))
 
+
 class TriggerPipelineTest(unittest.TestCase):
 
   def test_after_count(self):
-    p = Pipeline('DirectPipelineRunner')
+    p = Pipeline('DirectRunner')
     result = (p
               | beam.Create([1, 2, 3, 4, 5, 10, 11])
               | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
@@ -492,13 +493,12 @@ class TranscriptTest(unittest.TestCase):
 
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.transforms import window as window_module
-    from apache_beam.transforms import trigger as trigger_module
     # pylint: enable=wrong-import-order, wrong-import-position
     window_fn_names = dict(window_module.__dict__)
     window_fn_names.update({'CustomTimestampingFixedWindowsWindowFn':
                             CustomTimestampingFixedWindowsWindowFn})
     trigger_names = {'Default': DefaultTrigger}
-    trigger_names.update(trigger_module.__dict__)
+    trigger_names.update(trigger.__dict__)
 
     window_fn = parse_fn(spec.get('window_fn', 'GlobalWindows'),
                          window_fn_names)



[4/4] incubator-beam git commit: Closes #1685

Posted by ro...@apache.org.
Closes #1685


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c37274d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c37274d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c37274d

Branch: refs/heads/python-sdk
Commit: 9c37274dd77362e09f12efe27f69b40c6a32519c
Parents: 82d7f0f fd5bf2c
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Dec 22 20:22:36 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Dec 22 20:22:36 2016 -0800

----------------------------------------------------------------------
 sdks/python/README.md                           | 16 ++--
 .../examples/complete/autocomplete_test.py      |  2 +-
 .../examples/complete/estimate_pi_test.py       |  2 +-
 .../examples/complete/juliaset/juliaset_main.py |  2 +-
 .../apache_beam/examples/complete/tfidf_test.py |  2 +-
 .../examples/complete/top_wikipedia_sessions.py |  4 +-
 .../complete/top_wikipedia_sessions_test.py     |  2 +-
 .../cookbook/bigquery_side_input_test.py        |  2 +-
 .../cookbook/bigquery_tornadoes_test.py         |  2 +-
 .../examples/cookbook/coders_test.py            |  2 +-
 .../examples/cookbook/combiners_test.py         |  4 +-
 .../examples/cookbook/custom_ptransform_test.py |  2 +-
 .../examples/cookbook/filters_test.py           |  2 +-
 .../examples/cookbook/multiple_output_pardo.py  |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 10 +--
 .../examples/snippets/snippets_test.py          | 10 +--
 .../apache_beam/examples/wordcount_debugging.py |  2 +-
 .../apache_beam/examples/wordcount_minimal.py   |  4 +-
 .../apache_beam/internal/apiclient_test.py      |  4 +-
 sdks/python/apache_beam/io/avroio_test.py       | 10 +--
 .../python/apache_beam/io/concat_source_test.py |  2 +-
 .../apache_beam/io/filebasedsource_test.py      | 18 ++--
 sdks/python/apache_beam/io/fileio_test.py       | 10 +--
 sdks/python/apache_beam/io/sources_test.py      |  2 +-
 sdks/python/apache_beam/io/textio_test.py       | 24 +++---
 sdks/python/apache_beam/pipeline.py             |  2 +-
 sdks/python/apache_beam/pipeline_test.py        |  6 +-
 sdks/python/apache_beam/pvalue_test.py          |  4 +-
 sdks/python/apache_beam/runners/__init__.py     |  6 +-
 .../apache_beam/runners/dataflow_runner.py      | 14 +--
 .../apache_beam/runners/direct/__init__.py      |  2 +-
 .../consumer_tracking_pipeline_visitor_test.py  |  4 +-
 .../apache_beam/runners/direct/direct_runner.py | 10 +--
 .../runners/direct/evaluation_context.py        |  4 +-
 sdks/python/apache_beam/runners/runner.py       | 28 ++++--
 sdks/python/apache_beam/runners/runner_test.py  | 37 +++++---
 .../apache_beam/runners/template_runner_test.py |  8 +-
 .../runners/test/test_dataflow_runner.py        |  4 +-
 sdks/python/apache_beam/test_pipeline.py        |  2 +-
 .../apache_beam/transforms/aggregator_test.py   |  2 +-
 .../apache_beam/transforms/combiners_test.py    | 24 +++---
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   | 90 ++++++++++----------
 .../apache_beam/transforms/sideinputs_test.py   |  2 +-
 .../apache_beam/transforms/trigger_test.py      |  6 +-
 .../apache_beam/transforms/window_test.py       |  8 +-
 sdks/python/apache_beam/utils/options.py        |  8 +-
 .../utils/pipeline_options_validator.py         |  4 +-
 .../utils/pipeline_options_validator_test.py    | 22 ++---
 sdks/python/run_postcommit.sh                   |  4 +-
 50 files changed, 233 insertions(+), 212 deletions(-)
----------------------------------------------------------------------