You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/12 11:53:50 UTC
[airflow] branch main updated: Migrate Google ads example to new design AIP-47 (#24941)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a038b52b5a Migrate Google ads example to new design AIP-47 (#24941)
a038b52b5a is described below
commit a038b52b5a3bd4a0c97edf9f95da25b2316d305e
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Tue Jul 12 19:53:41 2022 +0800
Migrate Google ads example to new design AIP-47 (#24941)
related: #22447, #22430
---
.../example-dags.rst | 2 +-
.../operators/ads.rst | 4 +-
.../system/providers/google/ads}/__init__.py | 0
.../system/providers/google/ads}/example_ads.py | 49 +++++++++++++++++++---
4 files changed, 46 insertions(+), 9 deletions(-)
diff --git a/docs/apache-airflow-providers-google/example-dags.rst b/docs/apache-airflow-providers-google/example-dags.rst
index 970e6b12b4..eb6d2f6d89 100644
--- a/docs/apache-airflow-providers-google/example-dags.rst
+++ b/docs/apache-airflow-providers-google/example-dags.rst
@@ -20,7 +20,7 @@ Example DAGs
You can learn how to use Google integrations by analyzing the source code of the example DAGs:
-* `Google Ads <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/ads/example_dags>`__
+* `Google Ads <https://github.com/apache/airflow/tree/providers-google/8.1.0/tests/system/providers/google/ads>`__
* `Google Cloud (legacy) <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/cloud/example_dags>`__
* `Google Cloud <https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/cloud>`__
* `Google Firebase <https://github.com/apache/airflow/tree/providers-google/8.1.0/tests/system/providers/google/firebase>`__
diff --git a/docs/apache-airflow-providers-google/operators/ads.rst b/docs/apache-airflow-providers-google/operators/ads.rst
index e1b3a660c8..69908ce4b8 100644
--- a/docs/apache-airflow-providers-google/operators/ads.rst
+++ b/docs/apache-airflow-providers-google/operators/ads.rst
@@ -33,7 +33,7 @@ Google Ads to GCS
To query the Google Ads API and generate a CSV report of the results use
:class:`~airflow.providers.google.ads.transfers.ads_to_gcs.GoogleAdsToGcsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/ads/example_dags/example_ads.py
+.. exampleinclude:: /../../tests/system/providers/google/ads/example_ads.py
:language: python
:dedent: 4
:start-after: [START howto_google_ads_to_gcs_operator]
@@ -52,7 +52,7 @@ Upload Google Ads Accounts to GCS
To upload Google Ads accounts to Google Cloud Storage bucket use the
:class:`~airflow.providers.google.ads.transfers.ads_to_gcs.GoogleAdsListAccountsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/ads/example_dags/example_ads.py
+.. exampleinclude:: /../../tests/system/providers/google/ads/example_ads.py
:language: python
:dedent: 4
:start-after: [START howto_ads_list_accounts_operator]
diff --git a/airflow/providers/google/ads/example_dags/__init__.py b/tests/system/providers/google/ads/__init__.py
similarity index 100%
rename from airflow/providers/google/ads/example_dags/__init__.py
rename to tests/system/providers/google/ads/__init__.py
diff --git a/airflow/providers/google/ads/example_dags/example_ads.py b/tests/system/providers/google/ads/example_ads.py
similarity index 65%
rename from airflow/providers/google/ads/example_dags/example_ads.py
rename to tests/system/providers/google/ads/example_ads.py
index 85446b563d..015bd00aee 100644
--- a/airflow/providers/google/ads/example_dags/example_ads.py
+++ b/tests/system/providers/google/ads/example_ads.py
@@ -24,10 +24,17 @@ from datetime import datetime
from airflow import models
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator
from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
# [START howto_google_ads_env_variables]
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_google_ads"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CLIENT_IDS = ["1111111111", "2222222222"]
-BUCKET = os.environ.get("GOOGLE_ADS_BUCKET", "gs://INVALID BUCKET NAME")
GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv"
GCS_ACCOUNTS_CSV = "folder_name/accounts.csv"
QUERY = """
@@ -61,28 +68,58 @@ FIELDS_TO_EXTRACT = [
"metrics.all_conversions.value",
"metrics.cost_micros.value",
]
-
# [END howto_google_ads_env_variables]
with models.DAG(
- "example_google_ads",
- schedule_interval=None, # Override to match your needs
+ DAG_ID,
+ schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
+ tags=["example", "ads"],
) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+ )
+
# [START howto_google_ads_to_gcs_operator]
run_operator = GoogleAdsToGcsOperator(
client_ids=CLIENT_IDS,
query=QUERY,
attributes=FIELDS_TO_EXTRACT,
obj=GCS_OBJ_PATH,
- bucket=BUCKET,
+ bucket=BUCKET_NAME,
task_id="run_operator",
)
# [END howto_google_ads_to_gcs_operator]
# [START howto_ads_list_accounts_operator]
list_accounts = GoogleAdsListAccountsOperator(
- task_id="list_accounts", bucket=BUCKET, object_name=GCS_ACCOUNTS_CSV
+ task_id="list_accounts", bucket=BUCKET_NAME, object_name=GCS_ACCOUNTS_CSV
)
# [END howto_ads_list_accounts_operator]
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ # TEST BODY
+ >> run_operator
+ >> list_accounts
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)