You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:30 UTC

[46/50] [abbrv] beam git commit: add temp dataset location for non-query BigQuerySource

add temp dataset location for non-query BigQuerySource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/83dc58ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/83dc58ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/83dc58ee

Branch: refs/heads/gearpump-runner
Commit: 83dc58ee95e96586266c72f29e9d7c55c8ca0739
Parents: 3ef614c
Author: Uwe Jugel <uw...@lovoo.com>
Authored: Wed Apr 12 14:56:50 2017 +0200
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Apr 19 17:55:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py | 47 ++++++++++++++++++++-----
 1 file changed, 38 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/83dc58ee/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 25f544d..4686518 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -605,9 +605,27 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
     else:
       self.query = self.source.query
 
+  def _get_source_table_location(self):
+    tr = self.source.table_reference
+    if tr is None:
+      # TODO: implement location retrieval for query sources
+      return
+
+    if tr.projectId is None:
+      source_project_id = self.executing_project
+    else:
+      source_project_id = tr.projectId
+
+    source_dataset_id = tr.datasetId
+    source_table_id = tr.tableId
+    source_location = self.client.get_table_location(
+        source_project_id, source_dataset_id, source_table_id)
+    return source_location
+
   def __enter__(self):
     self.client = BigQueryWrapper(client=self.test_bigquery_client)
-    self.client.create_temporary_dataset(self.executing_project)
+    self.client.create_temporary_dataset(
+        self.executing_project, location=self._get_source_table_location())
     return self
 
   def __exit__(self, exception_type, exception_value, traceback):
@@ -801,7 +819,7 @@ class BigQueryWrapper(object):
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def get_or_create_dataset(self, project_id, dataset_id):
+  def get_or_create_dataset(self, project_id, dataset_id, location=None):
     # Check if dataset already exists otherwise create it
     try:
       dataset = self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
@@ -809,9 +827,11 @@ class BigQueryWrapper(object):
       return dataset
     except HttpError as exn:
       if exn.status_code == 404:
-        dataset = bigquery.Dataset(
-            datasetReference=bigquery.DatasetReference(
-                projectId=project_id, datasetId=dataset_id))
+        dataset_reference = bigquery.DatasetReference(
+            projectId=project_id, datasetId=dataset_id)
+        dataset = bigquery.Dataset(datasetReference=dataset_reference)
+        if location is not None:
+          dataset.location = location
         request = bigquery.BigqueryDatasetsInsertRequest(
             projectId=project_id, dataset=dataset)
         response = self.client.datasets.Insert(request)
@@ -867,7 +887,15 @@ class BigQueryWrapper(object):
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def create_temporary_dataset(self, project_id):
+  def get_table_location(self, project_id, dataset_id, table_id):
+    table = self._get_table(project_id, dataset_id, table_id)
+    return table.location
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def create_temporary_dataset(self, project_id, location=None):
+    # TODO: make location required, once "query" locations can be determined
     dataset_id = BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix
     # Check if dataset exists to make sure that the temporary id is unique
     try:
@@ -881,9 +909,10 @@ class BigQueryWrapper(object):
     except HttpError as exn:
       if exn.status_code == 404:
         logging.warning(
-            'Dataset %s:%s does not exist so we will create it as temporary',
-            project_id, dataset_id)
-        self.get_or_create_dataset(project_id, dataset_id)
+            'Dataset %s:%s does not exist so we will create it as temporary '
+            'with location=%s',
+            project_id, dataset_id, location)
+        self.get_or_create_dataset(project_id, dataset_id, location=location)
       else:
         raise