You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/28 01:11:22 UTC

[1/2] beam git commit: Add region option to Dataflow pipeline options.

Repository: beam
Updated Branches:
  refs/heads/master e58155263 -> fe441e34b


Add region option to Dataflow pipeline options.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8d251fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8d251fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8d251fc

Branch: refs/heads/master
Commit: c8d251fc682489909be387b6270a3c053ab3f187
Parents: e581552
Author: Ahmet Altay <al...@google.com>
Authored: Mon Mar 27 17:09:34 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 18:10:49 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/internal/apiclient.py      | 35 +++++++++++---------
 .../apache_beam/utils/pipeline_options.py       | 11 +++++-
 2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c8d251fc/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index f7daed0..6fa2f26 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -451,11 +451,12 @@ class DataflowApplicationClient(object):
 
   @retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)
   def get_job_metrics(self, job_id):
-    request = dataflow.DataflowProjectsJobsGetMetricsRequest()
+    request = dataflow.DataflowProjectsLocationsJobsGetMetricsRequest()
     request.jobId = job_id
+    request.location = self.google_cloud_options.region
     request.projectId = self.google_cloud_options.project
     try:
-      response = self._client.projects_jobs.GetMetrics(request)
+      response = self._client.projects_locations_jobs.GetMetrics(request)
     except exceptions.BadStatusCodeError as e:
       logging.error('HTTP status %d. Unable to query metrics',
                     e.response.status)
@@ -464,12 +465,13 @@ class DataflowApplicationClient(object):
 
   def submit_job_description(self, job):
     """Creates and excutes a job request."""
-    request = dataflow.DataflowProjectsJobsCreateRequest()
+    request = dataflow.DataflowProjectsLocationsJobsCreateRequest()
     request.projectId = self.google_cloud_options.project
+    request.location = self.google_cloud_options.region
     request.job = job.proto
 
     try:
-      response = self._client.projects_jobs.Create(request)
+      response = self._client.projects_locations_jobs.Create(request)
     except exceptions.BadStatusCodeError as e:
       logging.error('HTTP status %d trying to create job'
                     ' at dataflow service endpoint %s',
@@ -509,9 +511,10 @@ class DataflowApplicationClient(object):
       # Other states could only be set by the service.
       return False
 
-    request = dataflow.DataflowProjectsJobsUpdateRequest()
+    request = dataflow.DataflowProjectsLocationsJobsUpdateRequest()
     request.jobId = job_id
     request.projectId = self.google_cloud_options.project
+    request.location = self.google_cloud_options.region
     request.job = dataflow.Job(requestedState=new_state)
 
     self._client.projects_jobs.Update(request)
@@ -539,10 +542,11 @@ class DataflowApplicationClient(object):
         (e.g. '2015-03-10T00:01:53.074Z')
       currentStateTime: UTC time for the current state of the job.
     """
-    request = dataflow.DataflowProjectsJobsGetRequest()
+    request = dataflow.DataflowProjectsLocationsJobsGetRequest()
     request.jobId = job_id
     request.projectId = self.google_cloud_options.project
-    response = self._client.projects_jobs.Get(request)
+    request.location = self.google_cloud_options.region
+    response = self._client.projects_locations_jobs.Get(request)
     return response
 
   @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
@@ -588,8 +592,9 @@ class DataflowApplicationClient(object):
         JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
      messageText: A message string.
     """
-    request = dataflow.DataflowProjectsJobsMessagesListRequest(
-        jobId=job_id, projectId=self.google_cloud_options.project)
+    request = dataflow.DataflowProjectsLocationsJobsMessagesListRequest(
+        jobId=job_id, location=self.google_cloud_options.region,
+        projectId=self.google_cloud_options.project)
     if page_token is not None:
       request.pageToken = page_token
     if start_time is not None:
@@ -599,34 +604,34 @@ class DataflowApplicationClient(object):
     if minimum_importance is not None:
       if minimum_importance == 'JOB_MESSAGE_DEBUG':
         request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
+            dataflow.DataflowProjectsLocationsJobsMessagesListRequest
             .MinimumImportanceValueValuesEnum
             .JOB_MESSAGE_DEBUG)
       elif minimum_importance == 'JOB_MESSAGE_DETAILED':
         request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
+            dataflow.DataflowProjectsLocationsJobsMessagesListRequest
             .MinimumImportanceValueValuesEnum
             .JOB_MESSAGE_DETAILED)
       elif minimum_importance == 'JOB_MESSAGE_BASIC':
         request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
+            dataflow.DataflowProjectsLocationsJobsMessagesListRequest
             .MinimumImportanceValueValuesEnum
             .JOB_MESSAGE_BASIC)
       elif minimum_importance == 'JOB_MESSAGE_WARNING':
         request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
+            dataflow.DataflowProjectsLocationsJobsMessagesListRequest
             .MinimumImportanceValueValuesEnum
             .JOB_MESSAGE_WARNING)
       elif minimum_importance == 'JOB_MESSAGE_ERROR':
         request.minimumImportance = (
-            dataflow.DataflowProjectsJobsMessagesListRequest
+            dataflow.DataflowProjectsLocationsJobsMessagesListRequest
             .MinimumImportanceValueValuesEnum
             .JOB_MESSAGE_ERROR)
       else:
         raise RuntimeError(
             'Unexpected value for minimum_importance argument: %r',
             minimum_importance)
-    response = self._client.projects_jobs_messages.List(request)
+    response = self._client.projects_locations_jobs_messages.List(request)
     return response.jobMessages, response.nextPageToken
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c8d251fc/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index d07c328..c2a44ad 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -277,6 +277,15 @@ class GoogleCloudOptions(PipelineOptions):
     parser.add_argument('--temp_location',
                         default=None,
                         help='GCS path for saving temporary workflow jobs.')
+    # The Cloud Dataflow service does not yet honor this setting. However, once
+    # service support is added then users of this SDK will be able to control
+    # the region. Default is up to the Dataflow service. See
+    # https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
+    # list of valid options/
+    parser.add_argument('--region',
+                        default='us-central1',
+                        help='The Google Compute Engine region for creating '
+                        'Dataflow job.')
     parser.add_argument('--service_account_name',
                         default=None,
                         help='Name of the service account for Google APIs.')
@@ -336,7 +345,7 @@ class WorkerOptions(PipelineOptions):
         choices=['NONE', 'THROUGHPUT_BASED'],
         default=None,  # Meaning unset, distinct from 'NONE' meaning don't scale
         help=
-        ('If and how to auotscale the workerpool.'))
+        ('If and how to autoscale the workerpool.'))
     parser.add_argument(
         '--worker_machine_type',
         dest='machine_type',


[2/2] beam git commit: This closes #2342

Posted by al...@apache.org.
This closes #2342


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe441e34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe441e34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe441e34

Branch: refs/heads/master
Commit: fe441e34bbb2e0ddb438c72f9801533e7ddb29c4
Parents: e581552 c8d251f
Author: Ahmet Altay <al...@google.com>
Authored: Mon Mar 27 18:11:11 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Mar 27 18:11:11 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/internal/apiclient.py      | 35 +++++++++++---------
 .../apache_beam/utils/pipeline_options.py       | 11 +++++-
 2 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------