You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/19 18:01:36 UTC
[2/2] beam git commit: Add dry run option to DataflowRunner
Add dry run option to DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2683e87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2683e87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2683e87
Branch: refs/heads/master
Commit: c2683e876bea541684977ded9e179dd0e1a8ccdf
Parents: 1fb4304
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Fri Jun 16 17:18:57 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Jun 19 11:01:30 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/options/pipeline_options.py | 5 +++++
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 7 ++++++-
.../apache_beam/runners/dataflow/dataflow_runner_test.py | 9 ++++-----
3 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c2683e87/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 8644e51..dab8ff2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -605,6 +605,11 @@ class TestOptions(PipelineOptions):
help=('Verify state/output of e2e test pipeline. This is pickled '
'version of the matcher which should extends '
'hamcrest.core.base_matcher.BaseMatcher.'))
+ parser.add_argument(
+ '--dry_run',
+ default=False,
+ help=('Used in unit testing runners without submitting the '
+ 'actual job.'))
def validate(self, validator):
errors = []
http://git-wip-us.apache.org/repos/asf/beam/blob/c2683e87/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ce46ea9..9395f16 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -48,6 +48,7 @@ from apache_beam.transforms.display import DisplayData
from apache_beam.typehints import typehints
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import TestOptions
from apache_beam.utils.plugin import BeamPlugin
@@ -228,7 +229,6 @@ class DataflowRunner(PipelineRunner):
return FlattenInputVisitor()
- # TODO(mariagh): Make this method take pipepline_options
def run(self, pipeline):
"""Remotely executes entire pipeline or parts reachable from node."""
# Import here to avoid adding the dependency for local running scenarios.
@@ -263,6 +263,11 @@ class DataflowRunner(PipelineRunner):
# The superclass's run will trigger a traversal of all reachable nodes.
super(DataflowRunner, self).run(pipeline)
+ test_options = pipeline._options.view_as(TestOptions)
+ # If it is a dry run, return without submitting the job.
+ if test_options.dry_run:
+ return None
+
standard_options = pipeline._options.view_as(StandardOptions)
if standard_options.streaming:
job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
http://git-wip-us.apache.org/repos/asf/beam/blob/c2683e87/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 819d471..6cc5814 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -59,7 +59,8 @@ class DataflowRunnerTest(unittest.TestCase):
'--project=test-project',
'--staging_location=ignored',
'--temp_location=/dev/null',
- '--no_auth=True']
+ '--no_auth=True',
+ '--dry_run=True']
@mock.patch('time.sleep', return_value=None)
def test_wait_until_finish(self, patched_time_sleep):
@@ -108,8 +109,7 @@ class DataflowRunnerTest(unittest.TestCase):
(p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())
- remote_runner.job = apiclient.Job(p._options)
- super(DataflowRunner, remote_runner).run(p)
+ p.run()
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
@@ -160,8 +160,7 @@ class DataflowRunnerTest(unittest.TestCase):
(p | ptransform.Create([1, 2, 3, 4, 5])
| 'Do' >> SpecialParDo(SpecialDoFn(), now))
- remote_runner.job = apiclient.Job(p._options)
- super(DataflowRunner, remote_runner).run(p)
+ p.run()
job_dict = json.loads(str(remote_runner.job))
steps = [step
for step in job_dict['steps']