You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/11 16:44:44 UTC
[1/2] incubator-beam git commit: Making the dataflow temp_location
argument optional
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk e167d2b5a -> 55236825d
Making the dataflow temp_location argument optional
* Migrated the changes to make temp_location optional.
* Modifying temp_location flag documentation
* Adding a log message when defaulting to staging_location
* Add validation that staging is given if test isn't and adding test cases
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ce07b0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ce07b0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ce07b0f
Branch: refs/heads/python-sdk
Commit: 8ce07b0f67e5ed21b43d9070552aa73918e30b9a
Parents: e167d2b
Author: Zohar Yahav <zo...@smtp.corp.google.com>
Authored: Fri Jul 8 14:03:19 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 09:44:08 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 12 +++--
sdks/python/apache_beam/utils/options.py | 5 +-
.../utils/pipeline_options_validator_test.py | 52 +++++++++++++++-----
3 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ce07b0f/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 0bb30ac..99c7090 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -339,16 +339,20 @@ class Job(object):
def __init__(self, options):
self.options = options
self.google_cloud_options = options.view_as(GoogleCloudOptions)
- required_google_cloud_options = ['project',
- 'job_name',
- 'staging_location',
- 'temp_location']
+ required_google_cloud_options = ['project', 'job_name', 'staging_location']
missing = [
option for option in required_google_cloud_options
if not getattr(self.google_cloud_options, option)]
if missing:
raise ValueError(
'Missing required configuration parameters: %s' % missing)
+
+ if not self.google_cloud_options.temp_location:
+ logging.info('Defaulting to the staging_location as temp_location: %s',
+ self.google_cloud_options.staging_location)
+ (self.google_cloud_options
+ .temp_location) = self.google_cloud_options.staging_location
+
# Make the staging and temp locations job name and time specific. This is
# needed to avoid clashes between job submissions using the same staging
# area or team members using same job names. This method is not entirely
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ce07b0f/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index e7a6f52..335beea 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -236,6 +236,7 @@ class GoogleCloudOptions(PipelineOptions):
help='GCS path for staging code packages needed by '
'workers.')
# Remote execution must check that this option is not None.
+ # If temp_location is not set, it defaults to staging_location.
parser.add_argument('--temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
@@ -254,7 +255,9 @@ class GoogleCloudOptions(PipelineOptions):
if validator.is_service_runner():
errors.extend(validator.validate_cloud_options(self))
errors.extend(validator.validate_gcs_path(self, 'staging_location'))
- errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+ if getattr(self, 'temp_location',
+ None) or getattr(self, 'staging_location', None) is None:
+ errors.extend(validator.validate_gcs_path(self, 'temp_location'))
return errors
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ce07b0f/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index 1f59261..bca9fa5 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -74,31 +74,59 @@ class SetupTest(unittest.TestCase):
[])
def test_gcs_path(self):
- def get_validator(temp_location):
- options = ['--project=example:example', '--job_name=job',
- '--staging_location=gs://foo/bar']
+ def get_validator(temp_location, staging_location):
+ options = ['--project=example:example', '--job_name=job']
if temp_location is not None:
options.append('--temp_location=' + temp_location)
+ if staging_location is not None:
+ options.append('--staging_location=' + staging_location)
+
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowPipelineRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
test_cases = [
- {'temp_location': None, 'errors': ['temp_location']},
- {'temp_location': 'gcs:/foo/bar', 'errors': ['temp_location']},
- {'temp_location': 'gs:/foo/bar', 'errors': ['temp_location']},
- {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
- {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
- {'temp_location': 'gs://foo', 'errors': ['temp_location']},
- {'temp_location': 'gs://foo/', 'errors': []},
- {'temp_location': 'gs://foo/bar', 'errors': []},
+ {'temp_location': None,
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []},
+ {'temp_location': None,
+ 'staging_location': None,
+ 'errors': ['staging_location', 'temp_location']},
+ {'temp_location': 'gs://foo/bar',
+ 'staging_location': None,
+ 'errors': ['staging_location']},
+ {'temp_location': 'gs://foo/bar',
+ 'staging_location': 'gs://ABC/bar',
+ 'errors': ['staging_location']},
+ {'temp_location': 'gcs:/foo/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': ['temp_location']},
+ {'temp_location': 'gs:/foo/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': ['temp_location']},
+ {'temp_location': 'gs://ABC/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': ['temp_location']},
+ {'temp_location': 'gs://ABC/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': ['temp_location']},
+ {'temp_location': 'gs://foo',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': ['temp_location']},
+ {'temp_location': 'gs://foo/',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []},
+ {'temp_location': 'gs://foo/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []},
]
for case in test_cases:
- errors = get_validator(case['temp_location']).validate()
+ errors = get_validator(case['temp_location'],
+ case['staging_location']).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, case['errors']), [])
[2/2] incubator-beam git commit: Closes #615
Posted by dh...@apache.org.
Closes #615
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55236825
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55236825
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55236825
Branch: refs/heads/python-sdk
Commit: 55236825d716cc1d21e898dba14bb8cf26632546
Parents: e167d2b 8ce07b0
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 11 09:44:34 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 09:44:34 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 12 +++--
sdks/python/apache_beam/utils/options.py | 5 +-
.../utils/pipeline_options_validator_test.py | 52 +++++++++++++++-----
3 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------