You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/21 11:10:26 UTC

[airflow] branch main updated: Migrate Google example bigquery_to_mssql to new design AIP-47 (#25174)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66b3ca1d28 Migrate Google example bigquery_to_mssql to new design AIP-47 (#25174)
66b3ca1d28 is described below

commit 66b3ca1d2837610e01cf1d2314fadcc4be0a111c
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Thu Jul 21 19:10:11 2022 +0800

    Migrate Google example bigquery_to_mssql to new design AIP-47 (#25174)
    
    related: #22447, #22430
---
 .../cloud/bigquery}/example_bigquery_to_mssql.py   | 33 ++++++++++++++++++----
 1 file changed, 27 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
similarity index 73%
rename from airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py
rename to tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
index 64a18ad07e..18890344cf 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
@@ -30,18 +30,21 @@ from airflow.providers.google.cloud.operators.bigquery import (
 )
 from airflow.providers.google.cloud.transfers.bigquery_to_mssql import BigQueryToMsSqlOperator
 
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
+DAG_ID = "example_bigquery_to_mssql"
+
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
 DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", "INVALID BUCKET NAME")
 TABLE = "table_42"
 destination_table = "mssql_table_test"
 
 with models.DAG(
-    "example_bigquery_to_mssql",
-    schedule_interval=None,  # Override to match your needs
+    DAG_ID,
+    schedule_interval="@once",  # Override to match your needs
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "bigquery"],
 ) as dag:
     bigquery_to_mssql = BigQueryToMsSqlOperator(
         task_id="bigquery_to_mssql",
@@ -61,10 +64,28 @@ with models.DAG(
             {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
         ],
     )
-    create_dataset >> create_table >> bigquery_to_mssql
 
     delete_dataset = BigQueryDeleteDatasetOperator(
         task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
     )
 
-    bigquery_to_mssql >> delete_dataset
+    (
+        # TEST SETUP
+        create_dataset
+        >> create_table
+        # TEST BODY
+        >> bigquery_to_mssql
+        # TEST TEARDOWN
+        >> delete_dataset
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)