You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/12/17 07:15:51 UTC
(airflow) branch main updated: fix(bigquery.py): pass correct project_id to triggerer (#35200)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ffb003a58e fix(bigquery.py): pass correct project_id to triggerer (#35200)
ffb003a58e is described below
commit ffb003a58e6bd8dbff4de8c0e35e4b69560e914c
Author: Charis-Nicolas Georgiou <91...@users.noreply.github.com>
AuthorDate: Sun Dec 17 08:15:44 2023 +0100
fix(bigquery.py): pass correct project_id to triggerer (#35200)
---
.../providers/google/cloud/operators/bigquery.py | 13 ++++----
.../google/cloud/operators/test_bigquery.py | 39 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 91ff370366..28acc8f949 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2792,6 +2792,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
+ if self.project_id is None:
+ self.project_id = hook.project_id
self.job_id = hook.generate_job_id(
job_id=self.job_id,
@@ -2831,8 +2833,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
QueryJob._JOB_TYPE: ["destinationTable"],
}
- project_id = self.project_id or hook.project_id
- if project_id:
+ if self.project_id:
for job_type, tables_prop in job_types.items():
job_configuration = job.to_api_repr()["configuration"]
if job_type in job_configuration:
@@ -2842,7 +2843,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
persist_kwargs = {
"context": context,
"task_instance": self,
- "project_id": project_id,
+ "project_id": self.project_id,
"table_id": table,
}
if not isinstance(table, str):
@@ -2851,11 +2852,11 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)
self.job_id = job.job_id
- project_id = self.project_id or self.hook.project_id
- if project_id:
+
+ if self.project_id:
job_id_path = convert_job_id(
job_id=self.job_id, # type: ignore[arg-type]
- project_id=project_id,
+ project_id=self.project_id,
location=self.location,
)
context["ti"].xcom_push(key="job_id_path", value=job_id_path)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index 9aea410e78..64e31913fb 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1475,6 +1475,45 @@ class TestBigQueryInsertJobOperator:
exc.value.trigger, BigQueryInsertJobTrigger
), "Trigger is not a BigQueryInsertJobTrigger"
+ @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_bigquery_insert_job_operator_async_inherits_hook_project_id_when_non_given(
+ self, mock_hook, create_task_instance_of_operator
+ ):
+ """
+ Asserts that a deferred task of type BigQueryInsertJobTrigger will assume the project_id
+ of the hook that is used within the BigQueryInsertJobOperator when there is no
+ project_id passed to the BigQueryInsertJobOperator.
+ """
+ job_id = "123456"
+
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
+ }
+ mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+ ti = create_task_instance_of_operator(
+ BigQueryInsertJobOperator,
+ dag_id="dag_id",
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ deferrable=True,
+ project_id=None,
+ )
+
+ with pytest.raises(TaskDeferred) as exc:
+ ti.task.execute(MagicMock())
+
+ assert isinstance(
+ exc.value.trigger, BigQueryInsertJobTrigger
+ ), "Trigger is not a BigQueryInsertJobTrigger"
+
+ assert exc.value.trigger.project_id == TEST_GCP_PROJECT_ID
+
def test_bigquery_insert_job_operator_execute_failure(self):
"""Tests that an AirflowException is raised in case of error event"""
configuration = {