You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/14 21:16:33 UTC

[2/3] incubator-beam git commit: Cleanup known runners code.

Cleanup known runners code.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c055e845
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c055e845
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c055e845

Branch: refs/heads/python-sdk
Commit: c055e845b11e8a89e26f968c11f7cb8f3943f0d5
Parents: 84fe895
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 14 11:56:20 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 14 14:16:08 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  4 +++
 sdks/python/apache_beam/runners/runner.py       | 27 ++++++++++----------
 2 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c055e845/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 5a3f6a5..45bfb6e 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -44,6 +44,10 @@ from apache_beam.utils.options import StandardOptions
 from apache_beam.internal.clients import dataflow as dataflow_api
 
 
+def BlockingDataflowPipelineRunner(*args, **kwargs):
+  return DataflowPipelineRunner(*args, blocking=True, **kwargs)
+
+
 class DataflowPipelineRunner(PipelineRunner):
   """A runner that creates job graphs and submits them for remote execution.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c055e845/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 98f9758..3a77766 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -26,6 +26,12 @@ import shutil
 import tempfile
 
 
+_KNOWN_DIRECT_RUNNERS = ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
+                         'EagerPipelineRunner')
+_KNOWN_DATAFLOW_RUNNERS = ('DataflowPipelineRunner',
+                           'BlockingDataflowPipelineRunner')
+
+
 def create_runner(runner_name):
   """Creates a runner instance from a runner class name.
 
@@ -40,25 +46,20 @@ def create_runner(runner_name):
   Raises:
     RuntimeError: if an invalid runner name is used.
   """
-  # pylint: disable=wrong-import-order, wrong-import-position
-  if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
-                     'EagerPipelineRunner'):
+  if runner_name in _KNOWN_DIRECT_RUNNERS:
     runner_name = 'apache_beam.runners.direct_runner.' + runner_name
+  elif runner_name in _KNOWN_DATAFLOW_RUNNERS:
+    runner_name = 'apache_beam.runners.dataflow_runner.' + runner_name
 
-  if runner_name in ('DataflowPipelineRunner',
-                     'BlockingDataflowPipelineRunner'):
-    import apache_beam.runners.dataflow_runner
-    return apache_beam.runners.dataflow_runner.DataflowPipelineRunner(
-        blocking=runner_name == 'BlockingDataflowPipelineRunner')
-  elif '.' in runner_name:
+  if '.' in runner_name:
     module, runner = runner_name.rsplit('.', 1)
     return getattr(__import__(module, {}, {}, [runner], -1), runner)()
   else:
     raise ValueError(
-        'Unexpected pipeline runner: %s. Valid values are '
-        'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, '
-        'BlockingDataflowPipelineRunner or the fully qualified name of '
-        'a PipelineRunner subclass.' % runner_name)
+        'Unexpected pipeline runner: %s. Valid values are %s '
+        'or the fully qualified name of a PipelineRunner subclass.' % (
+            runner_name,
+            ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS)))
 
 
 class PipelineRunner(object):