You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/19 18:01:36 UTC

[2/2] beam git commit: Add dry run option to DataflowRunner

Add dry run option to DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2683e87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2683e87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2683e87

Branch: refs/heads/master
Commit: c2683e876bea541684977ded9e179dd0e1a8ccdf
Parents: 1fb4304
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Fri Jun 16 17:18:57 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Jun 19 11:01:30 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/options/pipeline_options.py         | 5 +++++
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 7 ++++++-
 .../apache_beam/runners/dataflow/dataflow_runner_test.py    | 9 ++++-----
 3 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2683e87/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 8644e51..dab8ff2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -605,6 +605,11 @@ class TestOptions(PipelineOptions):
         help=('Verify state/output of e2e test pipeline. This is pickled '
               'version of the matcher which should extends '
               'hamcrest.core.base_matcher.BaseMatcher.'))
+    parser.add_argument(
+        '--dry_run',
+        default=False,
+        help=('Used in unit testing runners without submitting the '
+              'actual job.'))
 
   def validate(self, validator):
     errors = []

http://git-wip-us.apache.org/repos/asf/beam/blob/c2683e87/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ce46ea9..9395f16 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -48,6 +48,7 @@ from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.utils.plugin import BeamPlugin
 
 
@@ -228,7 +229,6 @@ class DataflowRunner(PipelineRunner):
 
     return FlattenInputVisitor()
 
-  # TODO(mariagh): Make this method take pipepline_options
   def run(self, pipeline):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Import here to avoid adding the dependency for local running scenarios.
@@ -263,6 +263,11 @@ class DataflowRunner(PipelineRunner):
     # The superclass's run will trigger a traversal of all reachable nodes.
     super(DataflowRunner, self).run(pipeline)
 
+    test_options = pipeline._options.view_as(TestOptions)
+    # If it is a dry run, return without submitting the job.
+    if test_options.dry_run:
+      return None
+
     standard_options = pipeline._options.view_as(StandardOptions)
     if standard_options.streaming:
       job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION

http://git-wip-us.apache.org/repos/asf/beam/blob/c2683e87/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 819d471..6cc5814 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -59,7 +59,8 @@ class DataflowRunnerTest(unittest.TestCase):
       '--project=test-project',
       '--staging_location=ignored',
       '--temp_location=/dev/null',
-      '--no_auth=True']
+      '--no_auth=True',
+      '--dry_run=True']
 
   @mock.patch('time.sleep', return_value=None)
   def test_wait_until_finish(self, patched_time_sleep):
@@ -108,8 +109,7 @@ class DataflowRunnerTest(unittest.TestCase):
     (p | ptransform.Create([1, 2, 3])  # pylint: disable=expression-not-assigned
      | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
      | ptransform.GroupByKey())
-    remote_runner.job = apiclient.Job(p._options)
-    super(DataflowRunner, remote_runner).run(p)
+    p.run()
 
   def test_streaming_create_translation(self):
     remote_runner = DataflowRunner()
@@ -160,8 +160,7 @@ class DataflowRunnerTest(unittest.TestCase):
     (p | ptransform.Create([1, 2, 3, 4, 5])
      | 'Do' >> SpecialParDo(SpecialDoFn(), now))
 
-    remote_runner.job = apiclient.Job(p._options)
-    super(DataflowRunner, remote_runner).run(p)
+    p.run()
     job_dict = json.loads(str(remote_runner.job))
     steps = [step
              for step in job_dict['steps']