You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/21 23:15:35 UTC
[1/2] incubator-beam git commit: Fixing inconsistencies in
PipelineOptions
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 3454d691f -> bb09c07b6
Fixing inconsistencies in PipelineOptions
The following options have changed:
* job_name - Default is 'beamapp-username-date-microseconds'. Test was added.
* staging_location and temp_location - staging_location was the default of
temp_location. Now it's the other way around, and the tests reflect that.
* machine_type alias of worker_machine_type has been removed.
* disk_type alias of worker_disk_type has been removed.
* disk_source_image option has been removed.
* no_save_main_session option has been removed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7
Branch: refs/heads/python-sdk
Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1
Parents: 3454d69
Author: Pablo <pa...@google.com>
Authored: Tue Dec 6 18:01:54 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Dec 21 15:14:52 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 45 ++++++++++++--------
.../apache_beam/internal/apiclient_test.py | 6 +++
sdks/python/apache_beam/utils/options.py | 33 ++++++--------
.../utils/pipeline_options_validator.py | 11 ++---
.../utils/pipeline_options_validator_test.py | 8 ++--
5 files changed, 54 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index f1341a7..3a9ba46 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -18,6 +18,8 @@
"""Dataflow client utility functions."""
import codecs
+from datetime import datetime
+import getpass
import json
import logging
import os
@@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions
from apache_beam.internal.clients import storage
import apache_beam.internal.clients.dataflow as dataflow
-BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
-COMPUTE_API_SERVICE = 'compute.googleapis.com'
-STORAGE_API_SERVICE = 'storage.googleapis.com'
-
class Step(object):
"""Wrapper for a dataflow Step protobuf."""
@@ -121,11 +119,13 @@ class Environment(object):
self.worker_options = options.view_as(WorkerOptions)
self.debug_options = options.view_as(DebugOptions)
self.proto = dataflow.Environment()
- self.proto.clusterManagerApiService = COMPUTE_API_SERVICE
- self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE
+ self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
+ self.proto.dataset = '{}/cloud_dataflow'.format(
+ GoogleCloudOptions.BIGQUERY_API_SERVICE)
self.proto.tempStoragePrefix = (
- self.google_cloud_options.temp_location.replace('gs:/',
- STORAGE_API_SERVICE))
+ self.google_cloud_options.temp_location.replace(
+ 'gs:/',
+ GoogleCloudOptions.STORAGE_API_SERVICE))
# User agent information.
self.proto.userAgent = dataflow.Environment.UserAgentValue()
self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
@@ -165,7 +165,7 @@ class Environment(object):
dataflow.Package(
location='%s/%s' % (
self.google_cloud_options.staging_location.replace(
- 'gs:/', STORAGE_API_SERVICE),
+ 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
package),
name=package))
@@ -174,7 +174,7 @@ class Environment(object):
packages=package_descriptors,
taskrunnerSettings=dataflow.TaskRunnerSettings(
parallelWorkerSettings=dataflow.WorkerSettings(
- baseUrl='https://dataflow.googleapis.com',
+ baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
servicePath=self.google_cloud_options.dataflow_endpoint)))
pool.autoscalingSettings = dataflow.AutoscalingSettings()
# Set worker pool options received through command line.
@@ -195,8 +195,6 @@ class Environment(object):
pool.diskSizeGb = self.worker_options.disk_size_gb
if self.worker_options.disk_type:
pool.diskType = self.worker_options.disk_type
- if self.worker_options.disk_source_image:
- pool.diskSourceImage = self.worker_options.disk_source_image
if self.worker_options.zone:
pool.zone = self.worker_options.zone
if self.worker_options.network:
@@ -299,10 +297,23 @@ class Job(object):
json.loads(encoding.MessageToJson(self.proto), encoding='shortstrings'),
indent=2, sort_keys=True)
+ @staticmethod
+ def default_job_name(job_name):
+ if job_name is None:
+ user_name = getpass.getuser().lower()
+ date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f')
+ app_name = 'beamapp'
+ job_name = '{}-{}-{}'.format(app_name, user_name, date_component)
+ return job_name
+
def __init__(self, options):
self.options = options
self.google_cloud_options = options.view_as(GoogleCloudOptions)
- required_google_cloud_options = ['project', 'job_name', 'staging_location']
+ if not self.google_cloud_options.job_name:
+ self.google_cloud_options.job_name = self.default_job_name(
+ self.google_cloud_options.job_name)
+
+ required_google_cloud_options = ['project', 'job_name', 'temp_location']
missing = [
option for option in required_google_cloud_options
if not getattr(self.google_cloud_options, option)]
@@ -310,11 +321,11 @@ class Job(object):
raise ValueError(
'Missing required configuration parameters: %s' % missing)
- if not self.google_cloud_options.temp_location:
- logging.info('Defaulting to the staging_location as temp_location: %s',
- self.google_cloud_options.staging_location)
+ if not self.google_cloud_options.staging_location:
+ logging.info('Defaulting to the temp_location as staging_location: %s',
+ self.google_cloud_options.temp_location)
(self.google_cloud_options
- .temp_location) = self.google_cloud_options.staging_location
+ .staging_location) = self.google_cloud_options.temp_location
# Make the staging and temp locations job name and time specific. This is
# needed to avoid clashes between job submissions using the same staging
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 31b2dad..75d00e0 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -16,6 +16,7 @@
#
"""Unit tests for the apiclient module."""
+import re
import unittest
from apache_beam.utils.options import PipelineOptions
@@ -32,6 +33,11 @@ class UtilTest(unittest.TestCase):
pipeline_options,
DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
+ def test_default_job_name(self):
+ job_name = apiclient.Job.default_job_name(None)
+ regexp = 'beamapp-[a-z]*-[0-9]{10}-[0-9]{6}'
+ self.assertTrue(re.match(regexp, job_name))
+
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index eaa1065..085c09c 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -224,11 +224,16 @@ class TypeOptions(PipelineOptions):
class GoogleCloudOptions(PipelineOptions):
"""Google Cloud Dataflow service execution options."""
+ BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
+ COMPUTE_API_SERVICE = 'compute.googleapis.com'
+ STORAGE_API_SERVICE = 'storage.googleapis.com'
+ DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
+
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--dataflow_endpoint',
- default='https://dataflow.googleapis.com',
+ default=cls.DATAFLOW_ENDPOINT,
help=
('The URL for the Dataflow API. If not set, the default public URL '
'will be used.'))
@@ -251,7 +256,6 @@ class GoogleCloudOptions(PipelineOptions):
parser.add_argument('--temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
- # Options for using service account credentials.
parser.add_argument('--service_account_name',
default=None,
help='Name of the service account for Google APIs.')
@@ -272,10 +276,10 @@ class GoogleCloudOptions(PipelineOptions):
errors = []
if validator.is_service_runner():
errors.extend(validator.validate_cloud_options(self))
- errors.extend(validator.validate_gcs_path(self, 'staging_location'))
- if getattr(self, 'temp_location',
- None) or getattr(self, 'staging_location', None) is None:
- errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+ errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+ if getattr(self, 'staging_location',
+ None) or getattr(self, 'temp_location', None) is None:
+ errors.extend(validator.validate_gcs_path(self, 'staging_location'))
if self.view_as(DebugOptions).dataflow_job_file:
if self.view_as(GoogleCloudOptions).template_location:
@@ -312,9 +316,8 @@ class WorkerOptions(PipelineOptions):
default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
help=
('If and how to auotscale the workerpool.'))
- # TODO(silviuc): Remove --machine_type variant of the flag.
parser.add_argument(
- '--worker_machine_type', '--machine_type',
+ '--worker_machine_type',
dest='machine_type',
default=None,
help=('Machine type to create Dataflow worker VMs as. See '
@@ -329,21 +332,12 @@ class WorkerOptions(PipelineOptions):
help=
('Remote worker disk size, in gigabytes, or 0 to use the default size. '
'If not set, the Dataflow service will use a reasonable default.'))
- # TODO(silviuc): Remove --disk_type variant of the flag.
parser.add_argument(
- '--worker_disk_type', '--disk_type',
+ '--worker_disk_type',
dest='disk_type',
default=None,
help=('Specifies what type of persistent disk should be used.'))
parser.add_argument(
- '--disk_source_image',
- default=None,
- help=
- ('Disk source image to use by VMs for jobs. See '
- 'https://developers.google.com/compute/docs/images for further '
- 'details. If not set, the Dataflow service will use a reasonable '
- 'default.'))
- parser.add_argument(
'--zone',
default=None,
help=(
@@ -461,9 +455,6 @@ class SetupOptions(PipelineOptions):
'Some workflows do not need the session state if for instance all '
'their functions/classes are defined in proper modules (not __main__)'
' and the modules are importable in the worker. '))
- parser.add_argument('--no_save_main_session',
- dest='save_main_session',
- action='store_false')
parser.add_argument(
'--sdk_location',
default='default',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index b7b2978..c248022 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -17,7 +17,6 @@
"""Pipeline options validator.
"""
-
import re
from apache_beam.utils.options import DebugOptions
@@ -144,12 +143,10 @@ class PipelineOptionsValidator(object):
def validate_cloud_options(self, view):
"""Validates job_name and project arguments."""
errors = []
- job_name = view.job_name
- if job_name is None:
- errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'job_name'))
- elif not self.is_full_string_match(self.JOB_PATTERN, job_name):
- errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, job_name))
-
+ if (view.job_name and
+ not self.is_full_string_match(self.JOB_PATTERN, view.job_name)):
+ errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME,
+ view.job_name))
project = view.project
if project is None:
errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project'))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index bffbeca..5e93ff6 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -70,7 +70,7 @@ class SetupTest(unittest.TestCase):
self.assertEqual(
self.check_errors_for_arguments(
errors,
- ['project', 'job_name', 'staging_location', 'temp_location']),
+ ['project', 'staging_location', 'temp_location']),
[])
def test_gcs_path(self):
@@ -91,13 +91,13 @@ class SetupTest(unittest.TestCase):
test_cases = [
{'temp_location': None,
'staging_location': 'gs://foo/bar',
- 'errors': []},
+ 'errors': ['temp_location']},
{'temp_location': None,
'staging_location': None,
'errors': ['staging_location', 'temp_location']},
{'temp_location': 'gs://foo/bar',
'staging_location': None,
- 'errors': ['staging_location']},
+ 'errors': []},
{'temp_location': 'gs://foo/bar',
'staging_location': 'gs://ABC/bar',
'errors': ['staging_location']},
@@ -172,7 +172,7 @@ class SetupTest(unittest.TestCase):
return validator
test_cases = [
- {'job_name': None, 'errors': ['job_name']},
+ {'job_name': None, 'errors': []},
{'job_name': '12345', 'errors': ['job_name']},
{'job_name': 'FOO', 'errors': ['job_name']},
{'job_name': 'foo:bar', 'errors': ['job_name']},
[2/2] incubator-beam git commit: Closes #1526
Posted by ro...@apache.org.
Closes #1526
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb09c07b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb09c07b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb09c07b
Branch: refs/heads/python-sdk
Commit: bb09c07b6351dcc53c0bdc8bf1259261ad2edfba
Parents: 3454d69 35e2fdc
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Dec 21 15:15:20 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Dec 21 15:15:20 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 45 ++++++++++++--------
.../apache_beam/internal/apiclient_test.py | 6 +++
sdks/python/apache_beam/utils/options.py | 33 ++++++--------
.../utils/pipeline_options_validator.py | 11 ++---
.../utils/pipeline_options_validator_test.py | 8 ++--
5 files changed, 54 insertions(+), 49 deletions(-)
----------------------------------------------------------------------