You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/07/12 11:17:14 UTC
[airflow] branch main updated: Modify BigQueryCreateExternalTableOperator to use updated hook function (#24363)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c618da444e Modify BigQueryCreateExternalTableOperator to use updated hook function (#24363)
c618da444e is described below
commit c618da444e841afcfd73eeb0bce9c87648c89140
Author: Pengyu Wang <hn...@gmail.com>
AuthorDate: Tue Jul 12 04:17:06 2022 -0700
Modify BigQueryCreateExternalTableOperator to use updated hook function (#24363)
* Fixed BigQueryCreateExternalTableOperator and its unit test (#24160)
---
.../providers/google/cloud/operators/bigquery.py | 50 ++++++++++++++-------
.../google/cloud/operators/test_bigquery.py | 52 +++++++++++++++-------
2 files changed, 69 insertions(+), 33 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 81cce5ca2b..4c78f6d5ba 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1143,23 +1143,41 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
- table = bq_hook.create_external_table(
- external_project_dataset_table=self.destination_project_dataset_table,
- schema_fields=schema_fields,
- source_uris=source_uris,
- source_format=self.source_format,
- autodetect=self.autodetect,
- compression=self.compression,
- skip_leading_rows=self.skip_leading_rows,
- field_delimiter=self.field_delimiter,
- max_bad_records=self.max_bad_records,
- quote_character=self.quote_character,
- allow_quoted_newlines=self.allow_quoted_newlines,
- allow_jagged_rows=self.allow_jagged_rows,
- src_fmt_configs=self.src_fmt_configs,
- labels=self.labels,
- encryption_configuration=self.encryption_configuration,
+ project_id, dataset_id, table_id = bq_hook.split_tablename(
+ table_input=self.destination_project_dataset_table,
+ default_project_id=bq_hook.project_id or '',
+ )
+
+ table_resource = {
+ "tableReference": {
+ "projectId": project_id,
+ "datasetId": dataset_id,
+ "tableId": table_id,
+ },
+ "labels": self.labels,
+ "schema": {"fields": schema_fields},
+ "externalDataConfiguration": {
+ "source_uris": source_uris,
+ "source_format": self.source_format,
+ "maxBadRecords": self.max_bad_records,
+ "autodetect": self.autodetect,
+ "compression": self.compression,
+ "csvOptions": {
+ "fieldDelimeter": self.field_delimiter,
+ "skipLeadingRows": self.skip_leading_rows,
+ "quote": self.quote_character,
+ "allowQuotedNewlines": self.allow_quoted_newlines,
+ "allowJaggedRows": self.allow_jagged_rows,
+ },
+ },
+ "location": self.location,
+ "encryptionConfiguration": self.encryption_configuration,
+ }
+
+ table = bq_hook.create_empty_table(
+ table_resource=table_resource,
)
+
BigQueryTableLink.persist(
context=context,
task_instance=self,
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index 9a060d2f6e..7be855a9a0 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -190,7 +190,7 @@ class TestBigQueryCreateExternalTableOperator(unittest.TestCase):
def test_execute(self, mock_hook):
operator = BigQueryCreateExternalTableOperator(
task_id=TASK_ID,
- destination_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}',
+ destination_project_dataset_table=f'{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}',
schema_fields=[],
bucket=TEST_GCS_BUCKET,
source_objects=TEST_GCS_DATA,
@@ -198,23 +198,41 @@ class TestBigQueryCreateExternalTableOperator(unittest.TestCase):
autodetect=True,
)
+ mock_hook.return_value.split_tablename.return_value = (
+ TEST_GCP_PROJECT_ID,
+ TEST_DATASET,
+ TEST_TABLE_ID,
+ )
+
operator.execute(context=MagicMock())
- mock_hook.return_value.create_external_table.assert_called_once_with(
- external_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}',
- schema_fields=[],
- source_uris=[f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA],
- source_format=TEST_SOURCE_FORMAT,
- autodetect=True,
- compression='NONE',
- skip_leading_rows=0,
- field_delimiter=',',
- max_bad_records=0,
- quote_character=None,
- allow_quoted_newlines=False,
- allow_jagged_rows=False,
- src_fmt_configs={},
- labels=None,
- encryption_configuration=None,
+ mock_hook.return_value.create_empty_table.assert_called_once_with(
+ table_resource={
+ "tableReference": {
+ "projectId": TEST_GCP_PROJECT_ID,
+ "datasetId": TEST_DATASET,
+ "tableId": TEST_TABLE_ID,
+ },
+ "labels": None,
+ "schema": {"fields": []},
+ "externalDataConfiguration": {
+ "source_uris": [
+ f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA
+ ],
+ "source_format": TEST_SOURCE_FORMAT,
+ "maxBadRecords": 0,
+ "autodetect": True,
+ "compression": 'NONE',
+ "csvOptions": {
+ "fieldDelimeter": ',',
+ "skipLeadingRows": 0,
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "allowJaggedRows": False,
+ },
+ },
+ "location": None,
+ "encryptionConfiguration": None,
+ }
)