You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/08 15:52:23 UTC
[airflow] branch main updated: Workaround job race bug on biguery to gcs transfer (#24330)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 047a6162b0 Workaround job race bug on biguery to gcs transfer (#24330)
047a6162b0 is described below
commit 047a6162b0b4cbf07fe2fd978e335839a7d3900b
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 8 17:52:15 2022 +0200
Workaround job race bug on biguery to gcs transfer (#24330)
Fixes: #24277
---
airflow/providers/google/cloud/hooks/bigquery.py | 6 +++++-
airflow/providers/google/cloud/transfers/bigquery_to_gcs.py | 7 +++----
tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py | 1 +
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index 70795efd65..9edde76f3c 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1894,7 +1894,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
field_delimiter: str = ',',
print_header: bool = True,
labels: Optional[Dict] = None,
- ) -> str:
+ return_full_job: bool = False,
+ ) -> Union[str, BigQueryJob]:
"""
Executes a BigQuery extract command to copy data from BigQuery to
Google Cloud Storage. See here:
@@ -1915,6 +1916,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
:param print_header: Whether to print a header for a CSV file extract.
:param labels: a dictionary containing labels for the job/query,
passed to BigQuery
+ :param return_full_job: return full job instead of job id only
"""
warnings.warn(
"This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning
@@ -1953,6 +1955,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
job = self.insert_job(configuration=configuration, project_id=self.project_id)
self.running_job_id = job.job_id
+ if return_full_job:
+ return job
return job.job_id
def run_query(
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 09ac190e0f..fbae5a5b77 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
if TYPE_CHECKING:
@@ -115,7 +115,7 @@ class BigQueryToGCSOperator(BaseOperator):
location=self.location,
impersonation_chain=self.impersonation_chain,
)
- job_id = hook.run_extract(
+ job: BigQueryJob = hook.run_extract(
source_project_dataset_table=self.source_project_dataset_table,
destination_cloud_storage_uris=self.destination_cloud_storage_uris,
compression=self.compression,
@@ -123,9 +123,8 @@ class BigQueryToGCSOperator(BaseOperator):
field_delimiter=self.field_delimiter,
print_header=self.print_header,
labels=self.labels,
+ return_full_job=True,
)
-
- job = hook.get_job(job_id=job_id).to_api_repr()
conf = job["configuration"]["extract"]["sourceTable"]
dataset_id, project_id, table_id = conf["datasetId"], conf["projectId"], conf["tableId"]
BigQueryTableLink.persist(
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
index 5ed9b66031..b627d3672c 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
@@ -59,4 +59,5 @@ class TestBigQueryToGCSOperator(unittest.TestCase):
field_delimiter=field_delimiter,
print_header=print_header,
labels=labels,
+ return_full_job=True,
)