You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/28 01:11:22 UTC
[1/2] beam git commit: Add region option to Dataflow pipeline options.
Repository: beam
Updated Branches:
refs/heads/master e58155263 -> fe441e34b
Add region option to Dataflow pipeline options.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8d251fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8d251fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8d251fc
Branch: refs/heads/master
Commit: c8d251fc682489909be387b6270a3c053ab3f187
Parents: e581552
Author: Ahmet Altay <al...@google.com>
Authored: Mon Mar 27 17:09:34 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 18:10:49 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/internal/apiclient.py | 35 +++++++++++---------
.../apache_beam/utils/pipeline_options.py | 11 +++++-
2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c8d251fc/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index f7daed0..6fa2f26 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -451,11 +451,12 @@ class DataflowApplicationClient(object):
@retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)
def get_job_metrics(self, job_id):
- request = dataflow.DataflowProjectsJobsGetMetricsRequest()
+ request = dataflow.DataflowProjectsLocationsJobsGetMetricsRequest()
request.jobId = job_id
+ request.location = self.google_cloud_options.region
request.projectId = self.google_cloud_options.project
try:
- response = self._client.projects_jobs.GetMetrics(request)
+ response = self._client.projects_locations_jobs.GetMetrics(request)
except exceptions.BadStatusCodeError as e:
logging.error('HTTP status %d. Unable to query metrics',
e.response.status)
@@ -464,12 +465,13 @@ class DataflowApplicationClient(object):
def submit_job_description(self, job):
"""Creates and excutes a job request."""
- request = dataflow.DataflowProjectsJobsCreateRequest()
+ request = dataflow.DataflowProjectsLocationsJobsCreateRequest()
request.projectId = self.google_cloud_options.project
+ request.location = self.google_cloud_options.region
request.job = job.proto
try:
- response = self._client.projects_jobs.Create(request)
+ response = self._client.projects_locations_jobs.Create(request)
except exceptions.BadStatusCodeError as e:
logging.error('HTTP status %d trying to create job'
' at dataflow service endpoint %s',
@@ -509,9 +511,10 @@ class DataflowApplicationClient(object):
# Other states could only be set by the service.
return False
- request = dataflow.DataflowProjectsJobsUpdateRequest()
+ request = dataflow.DataflowProjectsLocationsJobsUpdateRequest()
request.jobId = job_id
request.projectId = self.google_cloud_options.project
+ request.location = self.google_cloud_options.region
request.job = dataflow.Job(requestedState=new_state)
self._client.projects_jobs.Update(request)
@@ -539,10 +542,11 @@ class DataflowApplicationClient(object):
(e.g. '2015-03-10T00:01:53.074Z')
currentStateTime: UTC time for the current state of the job.
"""
- request = dataflow.DataflowProjectsJobsGetRequest()
+ request = dataflow.DataflowProjectsLocationsJobsGetRequest()
request.jobId = job_id
request.projectId = self.google_cloud_options.project
- response = self._client.projects_jobs.Get(request)
+ request.location = self.google_cloud_options.region
+ response = self._client.projects_locations_jobs.Get(request)
return response
@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
@@ -588,8 +592,9 @@ class DataflowApplicationClient(object):
JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
messageText: A message string.
"""
- request = dataflow.DataflowProjectsJobsMessagesListRequest(
- jobId=job_id, projectId=self.google_cloud_options.project)
+ request = dataflow.DataflowProjectsLocationsJobsMessagesListRequest(
+ jobId=job_id, location=self.google_cloud_options.region,
+ projectId=self.google_cloud_options.project)
if page_token is not None:
request.pageToken = page_token
if start_time is not None:
@@ -599,34 +604,34 @@ class DataflowApplicationClient(object):
if minimum_importance is not None:
if minimum_importance == 'JOB_MESSAGE_DEBUG':
request.minimumImportance = (
- dataflow.DataflowProjectsJobsMessagesListRequest
+ dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_DEBUG)
elif minimum_importance == 'JOB_MESSAGE_DETAILED':
request.minimumImportance = (
- dataflow.DataflowProjectsJobsMessagesListRequest
+ dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_DETAILED)
elif minimum_importance == 'JOB_MESSAGE_BASIC':
request.minimumImportance = (
- dataflow.DataflowProjectsJobsMessagesListRequest
+ dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_BASIC)
elif minimum_importance == 'JOB_MESSAGE_WARNING':
request.minimumImportance = (
- dataflow.DataflowProjectsJobsMessagesListRequest
+ dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_WARNING)
elif minimum_importance == 'JOB_MESSAGE_ERROR':
request.minimumImportance = (
- dataflow.DataflowProjectsJobsMessagesListRequest
+ dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_ERROR)
else:
raise RuntimeError(
'Unexpected value for minimum_importance argument: %r',
minimum_importance)
- response = self._client.projects_jobs_messages.List(request)
+ response = self._client.projects_locations_jobs_messages.List(request)
return response.jobMessages, response.nextPageToken
http://git-wip-us.apache.org/repos/asf/beam/blob/c8d251fc/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index d07c328..c2a44ad 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -277,6 +277,15 @@ class GoogleCloudOptions(PipelineOptions):
parser.add_argument('--temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
+ # The Cloud Dataflow service does not yet honor this setting. However, once
+ # service support is added then users of this SDK will be able to control
+ # the region. Default is up to the Dataflow service. See
+ # https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
+ # list of valid options/
+ parser.add_argument('--region',
+ default='us-central1',
+ help='The Google Compute Engine region for creating '
+ 'Dataflow job.')
parser.add_argument('--service_account_name',
default=None,
help='Name of the service account for Google APIs.')
@@ -336,7 +345,7 @@ class WorkerOptions(PipelineOptions):
choices=['NONE', 'THROUGHPUT_BASED'],
default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
help=
- ('If and how to auotscale the workerpool.'))
+ ('If and how to autoscale the workerpool.'))
parser.add_argument(
'--worker_machine_type',
dest='machine_type',
[2/2] beam git commit: This closes #2342
Posted by al...@apache.org.
This closes #2342
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe441e34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe441e34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe441e34
Branch: refs/heads/master
Commit: fe441e34bbb2e0ddb438c72f9801533e7ddb29c4
Parents: e581552 c8d251f
Author: Ahmet Altay <al...@google.com>
Authored: Mon Mar 27 18:11:11 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 18:11:11 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/internal/apiclient.py | 35 +++++++++++---------
.../apache_beam/utils/pipeline_options.py | 11 +++++-
2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------