You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:30 UTC
[46/50] [abbrv] beam git commit: add temp dataset location for
non-query BigQuerySource
add temp dataset location for non-query BigQuerySource
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/83dc58ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/83dc58ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/83dc58ee
Branch: refs/heads/gearpump-runner
Commit: 83dc58ee95e96586266c72f29e9d7c55c8ca0739
Parents: 3ef614c
Author: Uwe Jugel <uw...@lovoo.com>
Authored: Wed Apr 12 14:56:50 2017 +0200
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Apr 19 17:55:12 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/bigquery.py | 47 ++++++++++++++++++++-----
1 file changed, 38 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/83dc58ee/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 25f544d..4686518 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -605,9 +605,27 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
else:
self.query = self.source.query
+ def _get_source_table_location(self):
+ tr = self.source.table_reference
+ if tr is None:
+ # TODO: implement location retrieval for query sources
+ return
+
+ if tr.projectId is None:
+ source_project_id = self.executing_project
+ else:
+ source_project_id = tr.projectId
+
+ source_dataset_id = tr.datasetId
+ source_table_id = tr.tableId
+ source_location = self.client.get_table_location(
+ source_project_id, source_dataset_id, source_table_id)
+ return source_location
+
def __enter__(self):
self.client = BigQueryWrapper(client=self.test_bigquery_client)
- self.client.create_temporary_dataset(self.executing_project)
+ self.client.create_temporary_dataset(
+ self.executing_project, location=self._get_source_table_location())
return self
def __exit__(self, exception_type, exception_value, traceback):
@@ -801,7 +819,7 @@ class BigQueryWrapper(object):
@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def get_or_create_dataset(self, project_id, dataset_id):
+ def get_or_create_dataset(self, project_id, dataset_id, location=None):
# Check if dataset already exists otherwise create it
try:
dataset = self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
@@ -809,9 +827,11 @@ class BigQueryWrapper(object):
return dataset
except HttpError as exn:
if exn.status_code == 404:
- dataset = bigquery.Dataset(
- datasetReference=bigquery.DatasetReference(
- projectId=project_id, datasetId=dataset_id))
+ dataset_reference = bigquery.DatasetReference(
+ projectId=project_id, datasetId=dataset_id)
+ dataset = bigquery.Dataset(datasetReference=dataset_reference)
+ if location is not None:
+ dataset.location = location
request = bigquery.BigqueryDatasetsInsertRequest(
projectId=project_id, dataset=dataset)
response = self.client.datasets.Insert(request)
@@ -867,7 +887,15 @@ class BigQueryWrapper(object):
@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def create_temporary_dataset(self, project_id):
+ def get_table_location(self, project_id, dataset_id, table_id):
+ table = self._get_table(project_id, dataset_id, table_id)
+ return table.location
+
+ @retry.with_exponential_backoff(
+ num_retries=MAX_RETRIES,
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+ def create_temporary_dataset(self, project_id, location=None):
+ # TODO: make location required, once "query" locations can be determined
dataset_id = BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix
# Check if dataset exists to make sure that the temporary id is unique
try:
@@ -881,9 +909,10 @@ class BigQueryWrapper(object):
except HttpError as exn:
if exn.status_code == 404:
logging.warning(
- 'Dataset %s:%s does not exist so we will create it as temporary',
- project_id, dataset_id)
- self.get_or_create_dataset(project_id, dataset_id)
+ 'Dataset %s:%s does not exist so we will create it as temporary '
+ 'with location=%s',
+ project_id, dataset_id, location)
+ self.get_or_create_dataset(project_id, dataset_id, location=location)
else:
raise