You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/03 17:52:52 UTC

[airflow] branch main updated: fix BigQueryInsertJobOperator (#24165)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a597a76e8f fix BigQueryInsertJobOperator (#24165)
a597a76e8f is described below

commit a597a76e8f893865e7380b072de612763639bfb9
Author: raphaelauv <ra...@users.noreply.github.com>
AuthorDate: Fri Jun 3 19:52:45 2022 +0200

    fix BigQueryInsertJobOperator (#24165)
---
 .../providers/google/cloud/operators/bigquery.py   | 19 ++++++----
 .../google/cloud/operators/test_bigquery.py        | 44 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 268bbc1cad..a53b4bad05 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2167,14 +2167,17 @@ class BigQueryInsertJobOperator(BaseOperator):
                     f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
                 )
 
-        table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
-        BigQueryTableLink.persist(
-            context=context,
-            task_instance=self,
-            dataset_id=table["datasetId"],
-            project_id=table["projectId"],
-            table_id=table["tableId"],
-        )
+        if "query" in job.to_api_repr()["configuration"]:
+            if "destinationTable" in job.to_api_repr()["configuration"]["query"]:
+                table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
+                BigQueryTableLink.persist(
+                    context=context,
+                    task_instance=self,
+                    dataset_id=table["datasetId"],
+                    project_id=table["projectId"],
+                    table_id=table["tableId"],
+                )
+
         self.job_id = job.job_id
         return job.job_id
 
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index b5e42cce2f..42f8794e8a 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -788,7 +788,7 @@ class TestBigQueryUpsertTableOperator(unittest.TestCase):
 class TestBigQueryInsertJobOperator:
     @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
     @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
-    def test_execute_success(self, mock_hook, mock_md5):
+    def test_execute_query_success(self, mock_hook, mock_md5):
         job_id = "123456"
         hash_ = "hash"
         real_job_id = f"{job_id}_{hash_}"
@@ -822,6 +822,48 @@ class TestBigQueryInsertJobOperator:
 
         assert result == real_job_id
 
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+    def test_execute_copy_success(self, mock_hook, mock_md5):
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+        mock_md5.return_value.hexdigest.return_value = hash_
+
+        configuration = {
+            "copy": {
+                "sourceTable": "aaa",
+                "destinationTable": "bbb",
+            }
+        }
+        mock_configuration = {
+            "configuration": configuration,
+            "jobReference": "a",
+        }
+        mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
+
+        mock_hook.return_value.insert_job.return_value.to_api_repr.return_value = mock_configuration
+
+        op = BigQueryInsertJobOperator(
+            task_id="copy_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+        result = op.execute(context=MagicMock())
+
+        mock_hook.return_value.insert_job.assert_called_once_with(
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=real_job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+            retry=DEFAULT_RETRY,
+            timeout=None,
+        )
+
+        assert result == real_job_id
+
     @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
     @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
     def test_on_kill(self, mock_hook, mock_md5):