You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/12 11:53:50 UTC

[airflow] branch main updated: Migrate Google ads example to new design AIP-47 (#24941)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a038b52b5a Migrate Google ads example to new design AIP-47 (#24941)
a038b52b5a is described below

commit a038b52b5a3bd4a0c97edf9f95da25b2316d305e
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Tue Jul 12 19:53:41 2022 +0800

    Migrate Google ads example to new design AIP-47 (#24941)
    
    related: #22447, #22430
---
 .../example-dags.rst                               |  2 +-
 .../operators/ads.rst                              |  4 +-
 .../system/providers/google/ads}/__init__.py       |  0
 .../system/providers/google/ads}/example_ads.py    | 49 +++++++++++++++++++---
 4 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/docs/apache-airflow-providers-google/example-dags.rst b/docs/apache-airflow-providers-google/example-dags.rst
index 970e6b12b4..eb6d2f6d89 100644
--- a/docs/apache-airflow-providers-google/example-dags.rst
+++ b/docs/apache-airflow-providers-google/example-dags.rst
@@ -20,7 +20,7 @@ Example DAGs
 
 You can learn how to use Google integrations by analyzing the source code of the example DAGs:
 
-* `Google Ads <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/ads/example_dags>`__
+* `Google Ads <https://github.com/apache/airflow/tree/providers-google/8.1.0/tests/system/providers/google/ads>`__
 * `Google Cloud (legacy) <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/cloud/example_dags>`__
 * `Google Cloud <https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/cloud>`__
 * `Google Firebase <https://github.com/apache/airflow/tree/providers-google/8.1.0/tests/system/providers/google/firebase>`__
diff --git a/docs/apache-airflow-providers-google/operators/ads.rst b/docs/apache-airflow-providers-google/operators/ads.rst
index e1b3a660c8..69908ce4b8 100644
--- a/docs/apache-airflow-providers-google/operators/ads.rst
+++ b/docs/apache-airflow-providers-google/operators/ads.rst
@@ -33,7 +33,7 @@ Google Ads to GCS
 To query the Google Ads API and generate a CSV report of the results use
 :class:`~airflow.providers.google.ads.transfers.ads_to_gcs.GoogleAdsToGcsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/ads/example_dags/example_ads.py
+.. exampleinclude:: /../../tests/system/providers/google/ads/example_ads.py
     :language: python
     :dedent: 4
     :start-after: [START howto_google_ads_to_gcs_operator]
@@ -52,7 +52,7 @@ Upload Google Ads Accounts to GCS
 To upload Google Ads accounts to Google Cloud Storage bucket use the
 :class:`~airflow.providers.google.ads.transfers.ads_to_gcs.GoogleAdsListAccountsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/ads/example_dags/example_ads.py
+.. exampleinclude:: /../../tests/system/providers/google/ads/example_ads.py
     :language: python
     :dedent: 4
     :start-after: [START howto_ads_list_accounts_operator]
diff --git a/airflow/providers/google/ads/example_dags/__init__.py b/tests/system/providers/google/ads/__init__.py
similarity index 100%
rename from airflow/providers/google/ads/example_dags/__init__.py
rename to tests/system/providers/google/ads/__init__.py
diff --git a/airflow/providers/google/ads/example_dags/example_ads.py b/tests/system/providers/google/ads/example_ads.py
similarity index 65%
rename from airflow/providers/google/ads/example_dags/example_ads.py
rename to tests/system/providers/google/ads/example_ads.py
index 85446b563d..015bd00aee 100644
--- a/airflow/providers/google/ads/example_dags/example_ads.py
+++ b/tests/system/providers/google/ads/example_ads.py
@@ -24,10 +24,17 @@ from datetime import datetime
 from airflow import models
 from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator
 from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
 
 # [START howto_google_ads_env_variables]
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_google_ads"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
 CLIENT_IDS = ["1111111111", "2222222222"]
-BUCKET = os.environ.get("GOOGLE_ADS_BUCKET", "gs://INVALID BUCKET NAME")
 GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv"
 GCS_ACCOUNTS_CSV = "folder_name/accounts.csv"
 QUERY = """
@@ -61,28 +68,58 @@ FIELDS_TO_EXTRACT = [
     "metrics.all_conversions.value",
     "metrics.cost_micros.value",
 ]
-
 # [END howto_google_ads_env_variables]
 
 with models.DAG(
-    "example_google_ads",
-    schedule_interval=None,  # Override to match your needs
+    DAG_ID,
+    schedule_interval='@once',
     start_date=datetime(2021, 1, 1),
     catchup=False,
+    tags=["example", "ads"],
 ) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+    )
+
     # [START howto_google_ads_to_gcs_operator]
     run_operator = GoogleAdsToGcsOperator(
         client_ids=CLIENT_IDS,
         query=QUERY,
         attributes=FIELDS_TO_EXTRACT,
         obj=GCS_OBJ_PATH,
-        bucket=BUCKET,
+        bucket=BUCKET_NAME,
         task_id="run_operator",
     )
     # [END howto_google_ads_to_gcs_operator]
 
     # [START howto_ads_list_accounts_operator]
     list_accounts = GoogleAdsListAccountsOperator(
-        task_id="list_accounts", bucket=BUCKET, object_name=GCS_ACCOUNTS_CSV
+        task_id="list_accounts", bucket=BUCKET_NAME, object_name=GCS_ACCOUNTS_CSV
     )
     # [END howto_ads_list_accounts_operator]
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        # TEST BODY
+        >> run_operator
+        >> list_accounts
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)