You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/14 21:16:32 UTC
[1/3] incubator-beam git commit: Accept runners by fully qualified
name.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 762a2930a -> 3b6950689
Accept runners by fully qualified name.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fe8954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fe8954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fe8954
Branch: refs/heads/python-sdk
Commit: 84fe8954669ef9a30448d140b5a578c14b863819
Parents: 762a293
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 14 10:53:09 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 14 14:15:59 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/runner.py | 28 ++++++++++-----------
sdks/python/apache_beam/runners/runner_test.py | 2 +-
2 files changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 55b63f3..98f9758 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -41,26 +41,24 @@ def create_runner(runner_name):
RuntimeError: if an invalid runner name is used.
"""
# pylint: disable=wrong-import-order, wrong-import-position
- if runner_name == 'DirectPipelineRunner':
- import apache_beam.runners.direct_runner
- return apache_beam.runners.direct_runner.DirectPipelineRunner()
- if runner_name == 'DiskCachedPipelineRunner':
- import apache_beam.runners.direct_runner
- return apache_beam.runners.direct_runner.DiskCachedPipelineRunner(
- )
- if runner_name == 'EagerPipelineRunner':
- import apache_beam.runners.direct_runner
- return apache_beam.runners.direct_runner.EagerPipelineRunner()
- elif runner_name in ('DataflowPipelineRunner',
- 'BlockingDataflowPipelineRunner'):
+ if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
+ 'EagerPipelineRunner'):
+ runner_name = 'apache_beam.runners.direct_runner.' + runner_name
+
+ if runner_name in ('DataflowPipelineRunner',
+ 'BlockingDataflowPipelineRunner'):
import apache_beam.runners.dataflow_runner
return apache_beam.runners.dataflow_runner.DataflowPipelineRunner(
blocking=runner_name == 'BlockingDataflowPipelineRunner')
+ elif '.' in runner_name:
+ module, runner = runner_name.rsplit('.', 1)
+ return getattr(__import__(module, {}, {}, [runner], -1), runner)()
else:
- raise RuntimeError(
+ raise ValueError(
'Unexpected pipeline runner: %s. Valid values are '
- 'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or '
- 'BlockingDataflowPipelineRunner.' % runner_name)
+ 'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, '
+ 'BlockingDataflowPipelineRunner or the fully qualified name of '
+ 'a PipelineRunner subclass.' % runner_name)
class PipelineRunner(object):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 20a7259..d2e70d7 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -44,7 +44,7 @@ class RunnerTest(unittest.TestCase):
self.assertTrue(
isinstance(create_runner('BlockingDataflowPipelineRunner'),
DataflowPipelineRunner))
- self.assertRaises(RuntimeError, create_runner, 'xyz')
+ self.assertRaises(ValueError, create_runner, 'xyz')
def test_remote_runner_translation(self):
remote_runner = DataflowPipelineRunner()
[2/3] incubator-beam git commit: Cleanup known runners code.
Posted by ro...@apache.org.
Cleanup known runners code.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c055e845
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c055e845
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c055e845
Branch: refs/heads/python-sdk
Commit: c055e845b11e8a89e26f968c11f7cb8f3943f0d5
Parents: 84fe895
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 14 11:56:20 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 14 14:16:08 2016 -0700
----------------------------------------------------------------------
.../apache_beam/runners/dataflow_runner.py | 4 +++
sdks/python/apache_beam/runners/runner.py | 27 ++++++++++----------
2 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c055e845/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 5a3f6a5..45bfb6e 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -44,6 +44,10 @@ from apache_beam.utils.options import StandardOptions
from apache_beam.internal.clients import dataflow as dataflow_api
+def BlockingDataflowPipelineRunner(*args, **kwargs):
+ return DataflowPipelineRunner(*args, blocking=True, **kwargs)
+
+
class DataflowPipelineRunner(PipelineRunner):
"""A runner that creates job graphs and submits them for remote execution.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c055e845/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 98f9758..3a77766 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -26,6 +26,12 @@ import shutil
import tempfile
+_KNOWN_DIRECT_RUNNERS = ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
+ 'EagerPipelineRunner')
+_KNOWN_DATAFLOW_RUNNERS = ('DataflowPipelineRunner',
+ 'BlockingDataflowPipelineRunner')
+
+
def create_runner(runner_name):
"""Creates a runner instance from a runner class name.
@@ -40,25 +46,20 @@ def create_runner(runner_name):
Raises:
RuntimeError: if an invalid runner name is used.
"""
- # pylint: disable=wrong-import-order, wrong-import-position
- if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
- 'EagerPipelineRunner'):
+ if runner_name in _KNOWN_DIRECT_RUNNERS:
runner_name = 'apache_beam.runners.direct_runner.' + runner_name
+ elif runner_name in _KNOWN_DATAFLOW_RUNNERS:
+ runner_name = 'apache_beam.runners.dataflow_runner.' + runner_name
- if runner_name in ('DataflowPipelineRunner',
- 'BlockingDataflowPipelineRunner'):
- import apache_beam.runners.dataflow_runner
- return apache_beam.runners.dataflow_runner.DataflowPipelineRunner(
- blocking=runner_name == 'BlockingDataflowPipelineRunner')
- elif '.' in runner_name:
+ if '.' in runner_name:
module, runner = runner_name.rsplit('.', 1)
return getattr(__import__(module, {}, {}, [runner], -1), runner)()
else:
raise ValueError(
- 'Unexpected pipeline runner: %s. Valid values are '
- 'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, '
- 'BlockingDataflowPipelineRunner or the fully qualified name of '
- 'a PipelineRunner subclass.' % runner_name)
+ 'Unexpected pipeline runner: %s. Valid values are %s '
+ 'or the fully qualified name of a PipelineRunner subclass.' % (
+ runner_name,
+ ', '.join(_KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS)))
class PipelineRunner(object):
[3/3] incubator-beam git commit: Closes #657
Posted by ro...@apache.org.
Closes #657
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b695068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b695068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b695068
Branch: refs/heads/python-sdk
Commit: 3b69506897db7f21cc09eaa5ac8c08e4ee15ad2c
Parents: 762a293 c055e84
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 14 14:16:09 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 14 14:16:09 2016 -0700
----------------------------------------------------------------------
.../apache_beam/runners/dataflow_runner.py | 4 ++
sdks/python/apache_beam/runners/runner.py | 39 ++++++++++----------
sdks/python/apache_beam/runners/runner_test.py | 2 +-
3 files changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------