You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/08 15:52:23 UTC

[airflow] branch main updated: Workaround job race bug on biguery to gcs transfer (#24330)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 047a6162b0 Workaround job race bug on biguery to gcs transfer (#24330)
047a6162b0 is described below

commit 047a6162b0b4cbf07fe2fd978e335839a7d3900b
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 8 17:52:15 2022 +0200

    Workaround job race bug on biguery to gcs transfer (#24330)
    
    Fixes: #24277
---
 airflow/providers/google/cloud/hooks/bigquery.py               | 6 +++++-
 airflow/providers/google/cloud/transfers/bigquery_to_gcs.py    | 7 +++----
 tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py | 1 +
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index 70795efd65..9edde76f3c 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1894,7 +1894,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         field_delimiter: str = ',',
         print_header: bool = True,
         labels: Optional[Dict] = None,
-    ) -> str:
+        return_full_job: bool = False,
+    ) -> Union[str, BigQueryJob]:
         """
         Executes a BigQuery extract command to copy data from BigQuery to
         Google Cloud Storage. See here:
@@ -1915,6 +1916,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         :param print_header: Whether to print a header for a CSV file extract.
         :param labels: a dictionary containing labels for the job/query,
             passed to BigQuery
+        :param return_full_job: return full job instead of job id only
         """
         warnings.warn(
             "This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning
@@ -1953,6 +1955,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
 
         job = self.insert_job(configuration=configuration, project_id=self.project_id)
         self.running_job_id = job.job_id
+        if return_full_job:
+            return job
         return job.job_id
 
     def run_query(
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 09ac190e0f..fbae5a5b77 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -19,7 +19,7 @@
 from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
 
 from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
 from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
 
 if TYPE_CHECKING:
@@ -115,7 +115,7 @@ class BigQueryToGCSOperator(BaseOperator):
             location=self.location,
             impersonation_chain=self.impersonation_chain,
         )
-        job_id = hook.run_extract(
+        job: BigQueryJob = hook.run_extract(
             source_project_dataset_table=self.source_project_dataset_table,
             destination_cloud_storage_uris=self.destination_cloud_storage_uris,
             compression=self.compression,
@@ -123,9 +123,8 @@ class BigQueryToGCSOperator(BaseOperator):
             field_delimiter=self.field_delimiter,
             print_header=self.print_header,
             labels=self.labels,
+            return_full_job=True,
         )
-
-        job = hook.get_job(job_id=job_id).to_api_repr()
         conf = job["configuration"]["extract"]["sourceTable"]
         dataset_id, project_id, table_id = conf["datasetId"], conf["projectId"], conf["tableId"]
         BigQueryTableLink.persist(
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
index 5ed9b66031..b627d3672c 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
@@ -59,4 +59,5 @@ class TestBigQueryToGCSOperator(unittest.TestCase):
             field_delimiter=field_delimiter,
             print_header=print_header,
             labels=labels,
+            return_full_job=True,
         )