You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/07/12 11:17:14 UTC

[airflow] branch main updated: Modify BigQueryCreateExternalTableOperator to use updated hook function (#24363)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c618da444e Modify BigQueryCreateExternalTableOperator to use updated hook function (#24363)
c618da444e is described below

commit c618da444e841afcfd73eeb0bce9c87648c89140
Author: Pengyu Wang <hn...@gmail.com>
AuthorDate: Tue Jul 12 04:17:06 2022 -0700

    Modify BigQueryCreateExternalTableOperator to use updated hook function (#24363)
    
    * Fixed BigQueryCreateExternalTableOperator and its unit test (#24160)
---
 .../providers/google/cloud/operators/bigquery.py   | 50 ++++++++++++++-------
 .../google/cloud/operators/test_bigquery.py        | 52 +++++++++++++++-------
 2 files changed, 69 insertions(+), 33 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 81cce5ca2b..4c78f6d5ba 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1143,23 +1143,41 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
 
         source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
 
-        table = bq_hook.create_external_table(
-            external_project_dataset_table=self.destination_project_dataset_table,
-            schema_fields=schema_fields,
-            source_uris=source_uris,
-            source_format=self.source_format,
-            autodetect=self.autodetect,
-            compression=self.compression,
-            skip_leading_rows=self.skip_leading_rows,
-            field_delimiter=self.field_delimiter,
-            max_bad_records=self.max_bad_records,
-            quote_character=self.quote_character,
-            allow_quoted_newlines=self.allow_quoted_newlines,
-            allow_jagged_rows=self.allow_jagged_rows,
-            src_fmt_configs=self.src_fmt_configs,
-            labels=self.labels,
-            encryption_configuration=self.encryption_configuration,
+        project_id, dataset_id, table_id = bq_hook.split_tablename(
+            table_input=self.destination_project_dataset_table,
+            default_project_id=bq_hook.project_id or '',
+        )
+
+        table_resource = {
+            "tableReference": {
+                "projectId": project_id,
+                "datasetId": dataset_id,
+                "tableId": table_id,
+            },
+            "labels": self.labels,
+            "schema": {"fields": schema_fields},
+            "externalDataConfiguration": {
+                "source_uris": source_uris,
+                "source_format": self.source_format,
+                "maxBadRecords": self.max_bad_records,
+                "autodetect": self.autodetect,
+                "compression": self.compression,
+                "csvOptions": {
+                    "fieldDelimeter": self.field_delimiter,
+                    "skipLeadingRows": self.skip_leading_rows,
+                    "quote": self.quote_character,
+                    "allowQuotedNewlines": self.allow_quoted_newlines,
+                    "allowJaggedRows": self.allow_jagged_rows,
+                },
+            },
+            "location": self.location,
+            "encryptionConfiguration": self.encryption_configuration,
+        }
+
+        table = bq_hook.create_empty_table(
+            table_resource=table_resource,
         )
+
         BigQueryTableLink.persist(
             context=context,
             task_instance=self,
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index 9a060d2f6e..7be855a9a0 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -190,7 +190,7 @@ class TestBigQueryCreateExternalTableOperator(unittest.TestCase):
     def test_execute(self, mock_hook):
         operator = BigQueryCreateExternalTableOperator(
             task_id=TASK_ID,
-            destination_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}',
+            destination_project_dataset_table=f'{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}',
             schema_fields=[],
             bucket=TEST_GCS_BUCKET,
             source_objects=TEST_GCS_DATA,
@@ -198,23 +198,41 @@ class TestBigQueryCreateExternalTableOperator(unittest.TestCase):
             autodetect=True,
         )
 
+        mock_hook.return_value.split_tablename.return_value = (
+            TEST_GCP_PROJECT_ID,
+            TEST_DATASET,
+            TEST_TABLE_ID,
+        )
+
         operator.execute(context=MagicMock())
-        mock_hook.return_value.create_external_table.assert_called_once_with(
-            external_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}',
-            schema_fields=[],
-            source_uris=[f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA],
-            source_format=TEST_SOURCE_FORMAT,
-            autodetect=True,
-            compression='NONE',
-            skip_leading_rows=0,
-            field_delimiter=',',
-            max_bad_records=0,
-            quote_character=None,
-            allow_quoted_newlines=False,
-            allow_jagged_rows=False,
-            src_fmt_configs={},
-            labels=None,
-            encryption_configuration=None,
+        mock_hook.return_value.create_empty_table.assert_called_once_with(
+            table_resource={
+                "tableReference": {
+                    "projectId": TEST_GCP_PROJECT_ID,
+                    "datasetId": TEST_DATASET,
+                    "tableId": TEST_TABLE_ID,
+                },
+                "labels": None,
+                "schema": {"fields": []},
+                "externalDataConfiguration": {
+                    "source_uris": [
+                        f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA
+                    ],
+                    "source_format": TEST_SOURCE_FORMAT,
+                    "maxBadRecords": 0,
+                    "autodetect": True,
+                    "compression": 'NONE',
+                    "csvOptions": {
+                        "fieldDelimeter": ',',
+                        "skipLeadingRows": 0,
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "allowJaggedRows": False,
+                    },
+                },
+                "location": None,
+                "encryptionConfiguration": None,
+            }
         )