You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/03 17:52:52 UTC
[airflow] branch main updated: fix BigQueryInsertJobOperator (#24165)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a597a76e8f fix BigQueryInsertJobOperator (#24165)
a597a76e8f is described below
commit a597a76e8f893865e7380b072de612763639bfb9
Author: raphaelauv <ra...@users.noreply.github.com>
AuthorDate: Fri Jun 3 19:52:45 2022 +0200
fix BigQueryInsertJobOperator (#24165)
---
.../providers/google/cloud/operators/bigquery.py | 19 ++++++----
.../google/cloud/operators/test_bigquery.py | 44 +++++++++++++++++++++-
2 files changed, 54 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 268bbc1cad..a53b4bad05 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2167,14 +2167,17 @@ class BigQueryInsertJobOperator(BaseOperator):
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)
- table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
- BigQueryTableLink.persist(
- context=context,
- task_instance=self,
- dataset_id=table["datasetId"],
- project_id=table["projectId"],
- table_id=table["tableId"],
- )
+ if "query" in job.to_api_repr()["configuration"]:
+ if "destinationTable" in job.to_api_repr()["configuration"]["query"]:
+ table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
+ BigQueryTableLink.persist(
+ context=context,
+ task_instance=self,
+ dataset_id=table["datasetId"],
+ project_id=table["projectId"],
+ table_id=table["tableId"],
+ )
+
self.job_id = job.job_id
return job.job_id
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index b5e42cce2f..42f8794e8a 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -788,7 +788,7 @@ class TestBigQueryUpsertTableOperator(unittest.TestCase):
class TestBigQueryInsertJobOperator:
@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
- def test_execute_success(self, mock_hook, mock_md5):
+ def test_execute_query_success(self, mock_hook, mock_md5):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
@@ -822,6 +822,48 @@ class TestBigQueryInsertJobOperator:
assert result == real_job_id
+ @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
+ @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+ def test_execute_copy_success(self, mock_hook, mock_md5):
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
+ mock_md5.return_value.hexdigest.return_value = hash_
+
+ configuration = {
+ "copy": {
+ "sourceTable": "aaa",
+ "destinationTable": "bbb",
+ }
+ }
+ mock_configuration = {
+ "configuration": configuration,
+ "jobReference": "a",
+ }
+ mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
+
+ mock_hook.return_value.insert_job.return_value.to_api_repr.return_value = mock_configuration
+
+ op = BigQueryInsertJobOperator(
+ task_id="copy_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
+ result = op.execute(context=MagicMock())
+
+ mock_hook.return_value.insert_job.assert_called_once_with(
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=real_job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ retry=DEFAULT_RETRY,
+ timeout=None,
+ )
+
+ assert result == real_job_id
+
@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_on_kill(self, mock_hook, mock_md5):