You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/09/30 17:49:31 UTC

[beam] branch master updated: Passing project properly in BQSource

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a08c01  Passing project properly in BQSource
     new a43ec7e  Merge pull request #12966 from Passing project properly in BQSource
1a08c01 is described below

commit 1a08c01ab1cadec16cc13866d5e6c64f6b447b03
Author: Pablo Estrada <pa...@apache.org>
AuthorDate: Tue Sep 29 12:08:40 2020 -0700

    Passing project properly in BQSource
---
 sdks/python/apache_beam/io/gcp/bigquery.py       | 7 ++++++-
 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 1 +
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4c42ed3..de68364 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -743,11 +743,13 @@ class _CustomBigQuerySource(BoundedSource):
       if (isinstance(self.table_reference, vp.ValueProvider) and
           self.table_reference.is_accessible()):
         table_ref = bigquery_tools.parse_table_reference(
-            self.table_reference.get(), project=self.project)
+            table_ref, project=self._get_project())
       elif isinstance(self.table_reference, vp.ValueProvider):
         # Size estimation is best effort. We return None as we have
         # no access to the table that we're querying.
         return None
+      if not table_ref.projectId:
+        table_ref.projectId = self._get_project()
       table = bq.get_table(
           table_ref.projectId, table_ref.datasetId, table_ref.tableId)
       return int(table.numBytes)
@@ -804,6 +806,9 @@ class _CustomBigQuerySource(BoundedSource):
         self._setup_temporary_dataset(bq)
         self.table_reference = self._execute_query(bq)
 
+      if not self.table_reference.projectId:
+        self.table_reference.projectId = self._get_project()
+
       schema, metadata_list = self._export_files(bq)
       self.split_result = [
           self._create_source(metadata.path, schema)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 100f8dc..1e29f3d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -794,6 +794,7 @@ class BigQueryWrapper(object):
             ),
             jobReference=job_reference,
         ))
+    logging.info('Performing BigQuery extract job: %s', request)
     response = self.client.jobs.Insert(request)
     return response.jobReference