You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/14 21:16:32 UTC

[1/3] incubator-beam git commit: Accept runners by fully qualified name.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 762a2930a -> 3b6950689


Accept runners by fully qualified name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fe8954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fe8954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fe8954

Branch: refs/heads/python-sdk
Commit: 84fe8954669ef9a30448d140b5a578c14b863819
Parents: 762a293
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 14 10:53:09 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 14 14:15:59 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner.py      | 28 ++++++++++-----------
 sdks/python/apache_beam/runners/runner_test.py |  2 +-
 2 files changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 55b63f3..98f9758 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -41,26 +41,24 @@ def create_runner(runner_name):
     RuntimeError: if an invalid runner name is used.
   """
   # pylint: disable=wrong-import-order, wrong-import-position
-  if runner_name == 'DirectPipelineRunner':
-    import apache_beam.runners.direct_runner
-    return apache_beam.runners.direct_runner.DirectPipelineRunner()
-  if runner_name == 'DiskCachedPipelineRunner':
-    import apache_beam.runners.direct_runner
-    return apache_beam.runners.direct_runner.DiskCachedPipelineRunner(
-    )
-  if runner_name == 'EagerPipelineRunner':
-    import apache_beam.runners.direct_runner
-    return apache_beam.runners.direct_runner.EagerPipelineRunner()
-  elif runner_name in ('DataflowPipelineRunner',
-                       'BlockingDataflowPipelineRunner'):
+  if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
+                     'EagerPipelineRunner'):
+    runner_name = 'apache_beam.runners.direct_runner.' + runner_name
+
+  if runner_name in ('DataflowPipelineRunner',
+                     'BlockingDataflowPipelineRunner'):
     import apache_beam.runners.dataflow_runner
     return apache_beam.runners.dataflow_runner.DataflowPipelineRunner(
         blocking=runner_name == 'BlockingDataflowPipelineRunner')
+  elif '.' in runner_name:
+    module, runner = runner_name.rsplit('.', 1)
+    return getattr(__import__(module, {}, {}, [runner], -1), runner)()
   else:
-    raise RuntimeError(
+    raise ValueError(
         'Unexpected pipeline runner: %s. Valid values are '
-        'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or '
-        'BlockingDataflowPipelineRunner.' % runner_name)
+        'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, '
+        'BlockingDataflowPipelineRunner or the fully qualified name of '
+        'a PipelineRunner subclass.' % runner_name)
 
 
 class PipelineRunner(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 20a7259..d2e70d7 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -44,7 +44,7 @@ class RunnerTest(unittest.TestCase):
     self.assertTrue(
         isinstance(create_runner('BlockingDataflowPipelineRunner'),
                    DataflowPipelineRunner))
-    self.assertRaises(RuntimeError, create_runner, 'xyz')
+    self.assertRaises(ValueError, create_runner, 'xyz')
 
   def test_remote_runner_translation(self):
     remote_runner = DataflowPipelineRunner()


[2/3] incubator-beam git commit: Cleanup known runners code.

Posted by ro...@apache.org.
Cleanup known runners code.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c055e845
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c055e845
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c055e845

Branch: refs/heads/python-sdk
Commit: c055e845b11e8a89e26f968c11f7cb8f3943f0d5
Parents: 84fe895
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 14 11:56:20 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 14 14:16:08 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  4 +++
 sdks/python/apache_beam/runners/runner.py       | 27 ++++++++++----------
 2 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c055e845/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 5a3f6a5..45bfb6e 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -44,6 +44,10 @@ from apache_beam.utils.options import StandardOptions
 from apache_beam.internal.clients import dataflow as dataflow_api
 
 
+def BlockingDataflowPipelineRunner(*args, **kwargs):
+  return DataflowPipelineRunner(*args, blocking=True, **kwargs)
+
+
 class DataflowPipelineRunner(PipelineRunner):
   """A runner that creates job graphs and submits them for remote execution.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c055e845/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 98f9758..3a77766 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -26,6 +26,12 @@ import shutil
 import tempfile
 
 
+_KNOWN_DIRECT_RUNNERS = ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
+                         'EagerPipelineRunner')
+_KNOWN_DATAFLOW_RUNNERS = ('DataflowPipelineRunner',
+                           'BlockingDataflowPipelineRunner')
+
+
 def create_runner(runner_name):
   """Creates a runner instance from a runner class name.
 
@@ -40,25 +46,20 @@ def create_runner(runner_name):
   Raises:
     RuntimeError: if an invalid runner name is used.
   """
-  # pylint: disable=wrong-import-order, wrong-import-position
-  if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
-                     'EagerPipelineRunner'):
+  if runner_name in _KNOWN_DIRECT_RUNNERS:
     runner_name = 'apache_beam.runners.direct_runner.' + runner_name
+  elif runner_name in _KNOWN_DATAFLOW_RUNNERS:
+    runner_name = 'apache_beam.runners.dataflow_runner.' + runner_name
 
-  if runner_name in ('DataflowPipelineRunner',
-                     'BlockingDataflowPipelineRunner'):
-    import apache_beam.runners.dataflow_runner
-    return apache_beam.runners.dataflow_runner.DataflowPipelineRunner(
-        blocking=runner_name == 'BlockingDataflowPipelineRunner')
-  elif '.' in runner_name:
+  if '.' in runner_name:
     module, runner = runner_name.rsplit('.', 1)
     return getattr(__import__(module, {}, {}, [runner], -1), runner)()
   else:
     raise ValueError(
-        'Unexpected pipeline runner: %s. Valid values are '
-        'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, '
-        'BlockingDataflowPipelineRunner or the fully qualified name of '
-        'a PipelineRunner subclass.' % runner_name)
+        'Unexpected pipeline runner: %s. Valid values are %s '
+        'or the fully qualified name of a PipelineRunner subclass.' % (
+            runner_name,
+            ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS)))
 
 
 class PipelineRunner(object):


[3/3] incubator-beam git commit: Closes #657

Posted by ro...@apache.org.
Closes #657


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b695068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b695068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b695068

Branch: refs/heads/python-sdk
Commit: 3b69506897db7f21cc09eaa5ac8c08e4ee15ad2c
Parents: 762a293 c055e84
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 14 14:16:09 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 14 14:16:09 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  4 ++
 sdks/python/apache_beam/runners/runner.py       | 39 ++++++++++----------
 sdks/python/apache_beam/runners/runner_test.py  |  2 +-
 3 files changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------