You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/21 11:10:26 UTC
[airflow] branch main updated: Migrate Google example bigquery_to_mssql to new design AIP-47 (#25174)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66b3ca1d28 Migrate Google example bigquery_to_mssql to new design AIP-47 (#25174)
66b3ca1d28 is described below
commit 66b3ca1d2837610e01cf1d2314fadcc4be0a111c
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Thu Jul 21 19:10:11 2022 +0800
Migrate Google example bigquery_to_mssql to new design AIP-47 (#25174)
related: #22447, #22430
---
.../cloud/bigquery}/example_bigquery_to_mssql.py | 33 ++++++++++++++++++----
1 file changed, 27 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
similarity index 73%
rename from airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py
rename to tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
index 64a18ad07e..18890344cf 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
@@ -30,18 +30,21 @@ from airflow.providers.google.cloud.operators.bigquery import (
)
from airflow.providers.google.cloud.transfers.bigquery_to_mssql import BigQueryToMsSqlOperator
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
+DAG_ID = "example_bigquery_to_mssql"
+
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", "INVALID BUCKET NAME")
TABLE = "table_42"
destination_table = "mssql_table_test"
with models.DAG(
- "example_bigquery_to_mssql",
- schedule_interval=None, # Override to match your needs
+ DAG_ID,
+ schedule_interval="@once", # Override to match your needs
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "bigquery"],
) as dag:
bigquery_to_mssql = BigQueryToMsSqlOperator(
task_id="bigquery_to_mssql",
@@ -61,10 +64,28 @@ with models.DAG(
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
- create_dataset >> create_table >> bigquery_to_mssql
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
- bigquery_to_mssql >> delete_dataset
+ (
+ # TEST SETUP
+ create_dataset
+ >> create_table
+ # TEST BODY
+ >> bigquery_to_mssql
+ # TEST TEARDOWN
+ >> delete_dataset
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)