You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/12/17 07:15:51 UTC

(airflow) branch main updated: fix(bigquery.py): pass correct project_id to triggerer (#35200)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ffb003a58e fix(bigquery.py): pass correct project_id to triggerer (#35200)
ffb003a58e is described below

commit ffb003a58e6bd8dbff4de8c0e35e4b69560e914c
Author: Charis-Nicolas Georgiou <91...@users.noreply.github.com>
AuthorDate: Sun Dec 17 08:15:44 2023 +0100

    fix(bigquery.py): pass correct project_id to triggerer (#35200)
---
 .../providers/google/cloud/operators/bigquery.py   | 13 ++++----
 .../google/cloud/operators/test_bigquery.py        | 39 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 91ff370366..28acc8f949 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2792,6 +2792,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
             impersonation_chain=self.impersonation_chain,
         )
         self.hook = hook
+        if self.project_id is None:
+            self.project_id = hook.project_id
 
         self.job_id = hook.generate_job_id(
             job_id=self.job_id,
@@ -2831,8 +2833,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
             QueryJob._JOB_TYPE: ["destinationTable"],
         }
 
-        project_id = self.project_id or hook.project_id
-        if project_id:
+        if self.project_id:
             for job_type, tables_prop in job_types.items():
                 job_configuration = job.to_api_repr()["configuration"]
                 if job_type in job_configuration:
@@ -2842,7 +2843,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
                             persist_kwargs = {
                                 "context": context,
                                 "task_instance": self,
-                                "project_id": project_id,
+                                "project_id": self.project_id,
                                 "table_id": table,
                             }
                             if not isinstance(table, str):
@@ -2851,11 +2852,11 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
                                 persist_kwargs["project_id"] = table["projectId"]
                             BigQueryTableLink.persist(**persist_kwargs)
         self.job_id = job.job_id
-        project_id = self.project_id or self.hook.project_id
-        if project_id:
+
+        if self.project_id:
             job_id_path = convert_job_id(
                 job_id=self.job_id,  # type: ignore[arg-type]
-                project_id=project_id,
+                project_id=self.project_id,
                 location=self.location,
             )
             context["ti"].xcom_push(key="job_id_path", value=job_id_path)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index 9aea410e78..64e31913fb 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1475,6 +1475,45 @@ class TestBigQueryInsertJobOperator:
             exc.value.trigger, BigQueryInsertJobTrigger
         ), "Trigger is not a BigQueryInsertJobTrigger"
 
+    @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_insert_job_operator_async_inherits_hook_project_id_when_non_given(
+        self, mock_hook, create_task_instance_of_operator
+    ):
+        """
+        Asserts that a deferred task of type BigQueryInsertJobTrigger will assume the project_id
+        of the hook that is used within the BigQueryInsertJobOperator when there is no
+        project_id passed to the BigQueryInsertJobOperator.
+        """
+        job_id = "123456"
+
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+        mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+        ti = create_task_instance_of_operator(
+            BigQueryInsertJobOperator,
+            dag_id="dag_id",
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            deferrable=True,
+            project_id=None,
+        )
+
+        with pytest.raises(TaskDeferred) as exc:
+            ti.task.execute(MagicMock())
+
+        assert isinstance(
+            exc.value.trigger, BigQueryInsertJobTrigger
+        ), "Trigger is not a BigQueryInsertJobTrigger"
+
+        assert exc.value.trigger.project_id == TEST_GCP_PROJECT_ID
+
     def test_bigquery_insert_job_operator_execute_failure(self):
         """Tests that an AirflowException is raised in case of error event"""
         configuration = {