You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/05 19:04:58 UTC

[1/2] incubator-beam git commit: Modify create_job to allow staging the job and not submitting it to the service.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 7c5e4aa66 -> 0d99856f3


Modify create_job to allow staging the job and not submitting it to the service.

- Modularize create_job in create job description, stage job, and send for execution.
- Modify --dataflow_job_file to stage the job and continue submitting it to the service.
- Add --template_location to stage the job but not submit it to the service.
- Add tests for both, including making them mutually exclusive (following Java SDK decision).
- Add template_runner_test.py with integration tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cfa0ad81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cfa0ad81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cfa0ad81

Branch: refs/heads/python-sdk
Commit: cfa0ad8136b323bade9de14ea6149e7f74cbd0b4
Parents: 7c5e4aa
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Wed Nov 2 09:14:48 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Dec 5 11:04:34 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/wordcount.py   |  1 -
 sdks/python/apache_beam/internal/apiclient.py   | 34 +++++++-
 .../apache_beam/runners/dataflow_runner.py      | 13 ++-
 sdks/python/apache_beam/template_runner_test.py | 83 ++++++++++++++++++++
 sdks/python/apache_beam/utils/options.py        | 10 +++
 .../apache_beam/utils/pipeline_options_test.py  | 13 +++
 .../utils/pipeline_options_validator_test.py    | 28 +++++++
 7 files changed, 175 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 096e508..7f347d8 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -59,7 +59,6 @@ class WordExtractingDoFn(beam.DoFn):
 
 def run(argv=None):
   """Main entry point; defines and runs the wordcount pipeline."""
-
   parser = argparse.ArgumentParser()
   parser.add_argument('--input',
                       dest='input',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 5612631..a894557 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -24,6 +24,7 @@ import os
 import re
 import time
 
+from StringIO import StringIO
 
 from apitools.base.py import encoding
 from apitools.base.py import exceptions
@@ -42,7 +43,6 @@ from apache_beam.utils.options import DebugOptions
 from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import StandardOptions
 from apache_beam.utils.options import WorkerOptions
-
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 
@@ -327,6 +327,9 @@ class Job(object):
     self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
     self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
 
+  def json(self):
+    return encoding.MessageToJson(self.proto)
+
 
 class DataflowApplicationClient(object):
   """A Dataflow API client used by application code to create and query jobs."""
@@ -392,8 +395,29 @@ class DataflowApplicationClient(object):
   # TODO(silviuc): Refactor so that retry logic can be applied.
   @retry.no_retries  # Using no_retries marks this as an integration point.
   def create_job(self, job):
-    """Submits for remote execution a job described by the workflow proto."""
-    # Stage job resources and add an environment proto with their paths.
+    """Creates a job description.
+    Additionally, it may stage it and/or submit it for remote execution.
+    """
+    self.create_job_description(job)
+
+    # Stage and submit the job when necessary
+    dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file
+    template_location = (
+        job.options.view_as(GoogleCloudOptions).template_location)
+
+    job_location = template_location or dataflow_job_file
+    if job_location:
+      gcs_or_local_path = os.path.dirname(job_location)
+      file_name = os.path.basename(job_location)
+      self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
+
+    if not template_location:
+      return self.submit_job_description()
+    else:
+      return None
+
+  def create_job_description(self, job):
+    """Creates a job described by the workflow proto."""
     resources = dependency.stage_job_resources(
         job.options, file_copy=self._gcs_file_copy)
     job.proto.environment = Environment(
@@ -401,8 +425,10 @@ class DataflowApplicationClient(object):
         environment_version=self.environment_version).proto
     # TODO(silviuc): Remove the debug logging eventually.
     logging.info('JOB: %s', job)
-    request = dataflow.DataflowProjectsJobsCreateRequest()
 
+  def submit_job_description(self):
+    """Creates and excutes a job request."""
+    request = dataflow.DataflowProjectsJobsCreateRequest()
     request.projectId = self.google_cloud_options.project
     request.job = job.proto
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 00b466b..8b953b0 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -160,20 +160,25 @@ class DataflowPipelineRunner(PipelineRunner):
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.internal import apiclient
     self.job = apiclient.Job(pipeline.options)
+
     # The superclass's run will trigger a traversal of all reachable nodes.
     super(DataflowPipelineRunner, self).run(pipeline)
-    # Get a Dataflow API client and submit the job.
+
     standard_options = pipeline.options.view_as(StandardOptions)
     if standard_options.streaming:
       job_version = DataflowPipelineRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
     else:
       job_version = DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
+
+    # Get a Dataflow API client and set its options
     self.dataflow_client = apiclient.DataflowApplicationClient(
         pipeline.options, job_version)
+
+    # Create the job
     self.result = DataflowPipelineResult(
         self.dataflow_client.create_job(self.job))
 
-    if self.blocking:
+    if self.result.has_job and self.blocking:
       thread = threading.Thread(
           target=DataflowPipelineRunner.poll_for_job_completion,
           args=(self, self.result.job_id()))
@@ -652,6 +657,10 @@ class DataflowPipelineResult(PipelineResult):
   def job_id(self):
     return self._job.id
 
+  @property
+  def has_job(self):
+    return self._job is not None
+
   def current_state(self):
     """Return the current state of the remote job.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/template_runner_test.py b/sdks/python/apache_beam/template_runner_test.py
new file mode 100644
index 0000000..bfcd70c
--- /dev/null
+++ b/sdks/python/apache_beam/template_runner_test.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for templated pipelines."""
+
+from __future__ import absolute_import
+
+import json
+import unittest
+import tempfile
+
+import apache_beam as beam
+from apache_beam.pipeline import Pipeline
+from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.internal import apiclient
+
+
+class TemplatingDataflowPipelineRunnerTest(unittest.TestCase):
+  """TemplatingDataflow tests."""
+  def test_full_completion(self):
+    dummy_file = tempfile.NamedTemporaryFile()
+    dummy_dir = tempfile.mkdtemp()
+
+    remote_runner = DataflowPipelineRunner()
+    pipeline = Pipeline(remote_runner,
+                        options=PipelineOptions([
+                            '--dataflow_endpoint=ignored',
+                            '--sdk_location=' + dummy_file.name,
+                            '--job_name=test-job',
+                            '--project=test-project',
+                            '--staging_location=' + dummy_dir,
+                            '--temp_location=/dev/null',
+                            '--template_location=' + dummy_file.name,
+                            '--no_auth=True']))
+
+    pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
+    pipeline.run()
+    with open(dummy_file.name) as template_file:
+      saved_job_dict = json.load(template_file)
+      self.assertEqual(
+          saved_job_dict['environment']['sdkPipelineOptions']
+          ['options']['project'], 'test-project')
+      self.assertEqual(
+          saved_job_dict['environment']['sdkPipelineOptions']
+          ['options']['job_name'], 'test-job')
+
+  def test_bad_path(self):
+    dummy_sdk_file = tempfile.NamedTemporaryFile()
+    remote_runner = DataflowPipelineRunner()
+    pipeline = Pipeline(remote_runner,
+                        options=PipelineOptions([
+                            '--dataflow_endpoint=ignored',
+                            '--sdk_location=' + dummy_sdk_file.name,
+                            '--job_name=test-job',
+                            '--project=test-project',
+                            '--staging_location=ignored',
+                            '--temp_location=/dev/null',
+                            '--template_location=/bad/path',
+                            '--no_auth=True']))
+    remote_runner.job = apiclient.Job(pipeline.options)
+
+    with self.assertRaises(IOError):
+      pipeline.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index aacb186..eaa1065 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -263,6 +263,10 @@ class GoogleCloudOptions(PipelineOptions):
                         default=None,
                         help='Identity to run virtual machines as.')
     parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
+    # Option to run templated pipelines
+    parser.add_argument('--template_location',
+                        default=None,
+                        help='Save job to specified local or GCS location.')
 
   def validate(self, validator):
     errors = []
@@ -272,6 +276,12 @@ class GoogleCloudOptions(PipelineOptions):
       if getattr(self, 'temp_location',
                  None) or getattr(self, 'staging_location', None) is None:
         errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+
+    if self.view_as(DebugOptions).dataflow_job_file:
+      if self.view_as(GoogleCloudOptions).template_location:
+        errors.append('--dataflow_job_file and --template_location '
+                      'are mutually exclusive.')
+
     return errors
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
index ed55362..d9439e9 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -152,6 +152,19 @@ class PipelineOptionsTest(unittest.TestCase):
     options = PipelineOptions(flags=[''])
     self.assertEqual(options.get_all_options()['extra_packages'], None)
 
+  def test_dataflow_job_file(self):
+    options = PipelineOptions(['--dataflow_job_file', 'abc'])
+    self.assertEqual(options.get_all_options()['dataflow_job_file'], 'abc')
+
+    options = PipelineOptions(flags=[''])
+    self.assertEqual(options.get_all_options()['dataflow_job_file'], None)
+
+  def test_template_location(self):
+    options = PipelineOptions(['--template_location', 'abc'])
+    self.assertEqual(options.get_all_options()['template_location'], 'abc')
+
+    options = PipelineOptions(flags=[''])
+    self.assertEqual(options.get_all_options()['template_location'], None)
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index bca9fa5..bffbeca 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -260,6 +260,34 @@ class SetupTest(unittest.TestCase):
           PipelineOptions(case['options']), case['runner'])
       self.assertEqual(validator.is_service_runner(), case['expected'])
 
+  def test_dataflow_job_file_and_template_location_mutually_exclusive(self):
+    runner = MockRunners.OtherRunner()
+    options = PipelineOptions([
+        '--template_location', 'abc',
+        '--dataflow_job_file', 'def'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertTrue(errors)
+
+  def test_validate_template_location(self):
+    runner = MockRunners.OtherRunner()
+    options = PipelineOptions([
+        '--template_location', 'abc',
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertFalse(errors)
+
+  def test_validate_dataflow_job_file(self):
+    runner = MockRunners.OtherRunner()
+    options = PipelineOptions([
+        '--dataflow_job_file', 'abc'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertFalse(errors)
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()


[2/2] incubator-beam git commit: Closes #1342

Posted by ro...@apache.org.
Closes #1342


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d99856f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d99856f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d99856f

Branch: refs/heads/python-sdk
Commit: 0d99856f37d6bca9bb8d676ae36157bd0515a4f2
Parents: 7c5e4aa cfa0ad8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Dec 5 11:04:35 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Dec 5 11:04:35 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/wordcount.py   |  1 -
 sdks/python/apache_beam/internal/apiclient.py   | 34 +++++++-
 .../apache_beam/runners/dataflow_runner.py      | 13 ++-
 sdks/python/apache_beam/template_runner_test.py | 83 ++++++++++++++++++++
 sdks/python/apache_beam/utils/options.py        | 10 +++
 .../apache_beam/utils/pipeline_options_test.py  | 13 +++
 .../utils/pipeline_options_validator_test.py    | 28 +++++++
 7 files changed, 175 insertions(+), 7 deletions(-)
----------------------------------------------------------------------