You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/11 16:44:44 UTC

[1/2] incubator-beam git commit: Making the dataflow temp_location argument optional

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e167d2b5a -> 55236825d


Making the dataflow temp_location argument optional

* Migrated the changes to make temp_location optional.
* Modifying temp_location flag documentation
* Adding a log message when defaulting to staging_location
* Add validation that staging is given if test isn't and adding test cases


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ce07b0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ce07b0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ce07b0f

Branch: refs/heads/python-sdk
Commit: 8ce07b0f67e5ed21b43d9070552aa73918e30b9a
Parents: e167d2b
Author: Zohar Yahav <zo...@smtp.corp.google.com>
Authored: Fri Jul 8 14:03:19 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 09:44:08 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   | 12 +++--
 sdks/python/apache_beam/utils/options.py        |  5 +-
 .../utils/pipeline_options_validator_test.py    | 52 +++++++++++++++-----
 3 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ce07b0f/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 0bb30ac..99c7090 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -339,16 +339,20 @@ class Job(object):
   def __init__(self, options):
     self.options = options
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
-    required_google_cloud_options = ['project',
-                                     'job_name',
-                                     'staging_location',
-                                     'temp_location']
+    required_google_cloud_options = ['project', 'job_name', 'staging_location']
     missing = [
         option for option in required_google_cloud_options
         if not getattr(self.google_cloud_options, option)]
     if missing:
       raise ValueError(
           'Missing required configuration parameters: %s' % missing)
+
+    if not self.google_cloud_options.temp_location:
+      logging.info('Defaulting to the staging_location as temp_location: %s',
+                   self.google_cloud_options.staging_location)
+      (self.google_cloud_options
+       .temp_location) = self.google_cloud_options.staging_location
+
     # Make the staging and temp locations job name and time specific. This is
     # needed to avoid clashes between job submissions using the same staging
     # area or team members using same job names. This method is not entirely

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ce07b0f/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index e7a6f52..335beea 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -236,6 +236,7 @@ class GoogleCloudOptions(PipelineOptions):
                         help='GCS path for staging code packages needed by '
                         'workers.')
     # Remote execution must check that this option is not None.
+    # If temp_location is not set, it defaults to staging_location.
     parser.add_argument('--temp_location',
                         default=None,
                         help='GCS path for saving temporary workflow jobs.')
@@ -254,7 +255,9 @@ class GoogleCloudOptions(PipelineOptions):
     if validator.is_service_runner():
       errors.extend(validator.validate_cloud_options(self))
       errors.extend(validator.validate_gcs_path(self, 'staging_location'))
-      errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+      if getattr(self, 'temp_location',
+                 None) or getattr(self, 'staging_location', None) is None:
+        errors.extend(validator.validate_gcs_path(self, 'temp_location'))
     return errors
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ce07b0f/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index 1f59261..bca9fa5 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -74,31 +74,59 @@ class SetupTest(unittest.TestCase):
         [])
 
   def test_gcs_path(self):
-    def get_validator(temp_location):
-      options = ['--project=example:example', '--job_name=job',
-                 '--staging_location=gs://foo/bar']
+    def get_validator(temp_location, staging_location):
+      options = ['--project=example:example', '--job_name=job']
 
       if temp_location is not None:
         options.append('--temp_location=' + temp_location)
 
+      if staging_location is not None:
+        options.append('--staging_location=' + staging_location)
+
       pipeline_options = PipelineOptions(options)
       runner = MockRunners.DataflowPipelineRunner()
       validator = PipelineOptionsValidator(pipeline_options, runner)
       return validator
 
     test_cases = [
-        {'temp_location': None, 'errors': ['temp_location']},
-        {'temp_location': 'gcs:/foo/bar', 'errors': ['temp_location']},
-        {'temp_location': 'gs:/foo/bar', 'errors': ['temp_location']},
-        {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
-        {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
-        {'temp_location': 'gs://foo', 'errors': ['temp_location']},
-        {'temp_location': 'gs://foo/', 'errors': []},
-        {'temp_location': 'gs://foo/bar', 'errors': []},
+        {'temp_location': None,
+         'staging_location': 'gs://foo/bar',
+         'errors': []},
+        {'temp_location': None,
+         'staging_location': None,
+         'errors': ['staging_location', 'temp_location']},
+        {'temp_location': 'gs://foo/bar',
+         'staging_location': None,
+         'errors': ['staging_location']},
+        {'temp_location': 'gs://foo/bar',
+         'staging_location': 'gs://ABC/bar',
+         'errors': ['staging_location']},
+        {'temp_location': 'gcs:/foo/bar',
+         'staging_location': 'gs://foo/bar',
+         'errors': ['temp_location']},
+        {'temp_location': 'gs:/foo/bar',
+         'staging_location': 'gs://foo/bar',
+         'errors': ['temp_location']},
+        {'temp_location': 'gs://ABC/bar',
+         'staging_location': 'gs://foo/bar',
+         'errors': ['temp_location']},
+        {'temp_location': 'gs://ABC/bar',
+         'staging_location': 'gs://foo/bar',
+         'errors': ['temp_location']},
+        {'temp_location': 'gs://foo',
+         'staging_location': 'gs://foo/bar',
+         'errors': ['temp_location']},
+        {'temp_location': 'gs://foo/',
+         'staging_location': 'gs://foo/bar',
+         'errors': []},
+        {'temp_location': 'gs://foo/bar',
+         'staging_location': 'gs://foo/bar',
+         'errors': []},
     ]
 
     for case in test_cases:
-      errors = get_validator(case['temp_location']).validate()
+      errors = get_validator(case['temp_location'],
+                             case['staging_location']).validate()
       self.assertEqual(
           self.check_errors_for_arguments(errors, case['errors']), [])
 


[2/2] incubator-beam git commit: Closes #615

Posted by dh...@apache.org.
Closes #615


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55236825
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55236825
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55236825

Branch: refs/heads/python-sdk
Commit: 55236825d716cc1d21e898dba14bb8cf26632546
Parents: e167d2b 8ce07b0
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 11 09:44:34 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 09:44:34 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   | 12 +++--
 sdks/python/apache_beam/utils/options.py        |  5 +-
 .../utils/pipeline_options_validator_test.py    | 52 +++++++++++++++-----
 3 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------