You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/16 07:42:20 UTC

[airflow] branch main updated: Migrate Google campaign manager example to new design AIP-47 (#25069)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4473b27d08 Migrate Google campaign manager example to new design AIP-47 (#25069)
4473b27d08 is described below

commit 4473b27d089ff28d3d73af7c4a8bc6e011134b50
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Sat Jul 16 15:42:12 2022 +0800

    Migrate Google campaign manager example to new design AIP-47 (#25069)
    
    related: #22447, #22430
---
 .../marketing_platform/campaign_manager.rst        | 14 ++---
 .../operators/test_campaign_manager_system.py      | 46 ----------------
 .../example_campaign_manager.py                    | 63 ++++++++++++++++------
 3 files changed, 53 insertions(+), 70 deletions(-)

diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst
index cbcf5706f5..4305962399 100644
--- a/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst
+++ b/docs/apache-airflow-providers-google/operators/marketing_platform/campaign_manager.rst
@@ -36,7 +36,7 @@ To delete Campaign Manager report you can use the
 :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerDeleteReportOperator`.
 It deletes a report by its unique ID.
 
-.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py
     :language: python
     :dedent: 4
     :start-after: [START howto_campaign_manager_delete_report_operator]
@@ -54,7 +54,7 @@ Downloading a report
 The :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerDownloadReportOperator`.
 allows you to download a Campaign Manager to Google Cloud Storage bucket.
 
-.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py
     :language: python
     :dedent: 4
     :start-after: [START howto_campaign_manager_get_report_operator]
@@ -72,7 +72,7 @@ Waiting for a report
 Report are generated asynchronously. To wait for report to be ready for downloading
 you can use :class:`~airflow.providers.google.marketing_platform.sensors.campaign_manager.GoogleCampaignManagerReportSensor`.
 
-.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py
     :language: python
     :dedent: 4
     :start-after: [START howto_campaign_manager_wait_for_operation]
@@ -91,7 +91,7 @@ To insert a Campaign Manager report you can use the
 :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerInsertReportOperator`.
 Running this operator creates a new report.
 
-.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py
     :language: python
     :dedent: 4
     :start-after: [START howto_campaign_manager_insert_report_operator]
@@ -111,7 +111,7 @@ Running a report
 To run Campaign Manager report you can use the
 :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerRunReportOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py
     :language: python
     :dedent: 4
     :start-after: [START howto_campaign_manager_run_report_operator]
@@ -130,7 +130,7 @@ Inserting a conversions
 To insert Campaign Manager conversions you can use the
 :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchInsertConversionsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py
     :language: python
     :dedent: 4
     :start-after: [START howto_campaign_manager_insert_conversions]
@@ -149,7 +149,7 @@ Updating a conversions
 To update Campaign Manager conversions you can use the
 :class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchUpdateConversionsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_campaign_manager.py
     :language: python
     :dedent: 4
     :start-after: [START howto_campaign_manager_update_conversions]
diff --git a/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py b/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py
deleted file mode 100644
index 3838e6255b..0000000000
--- a/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import pytest
-
-from airflow.providers.google.marketing_platform.example_dags.example_campaign_manager import BUCKET
-from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY
-from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-# Required scopes
-SCOPES = [
-    'https://www.googleapis.com/auth/dfatrafficking',
-    'https://www.googleapis.com/auth/dfareporting',
-    'https://www.googleapis.com/auth/ddmconversions',
-    'https://www.googleapis.com/auth/cloud-platform',
-]
-
-
-@pytest.mark.system("google.marketing_platform")
-@pytest.mark.credential_file(GMP_KEY)
-class CampaignManagerSystemTest(GoogleSystemTest):
-    def setUp(self):
-        super().setUp()
-        self.create_gcs_bucket(BUCKET)
-
-    def tearDown(self):
-        self.delete_gcs_bucket(BUCKET)
-        super().tearDown()
-
-    @provide_gcp_context(GMP_KEY, scopes=SCOPES)
-    def test_run_example_dag(self):
-        self.run_dag('example_campaign_manager', MARKETING_DAG_FOLDER)
diff --git a/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py b/tests/system/providers/google/marketing_platform/example_campaign_manager.py
similarity index 77%
rename from airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
rename to tests/system/providers/google/marketing_platform/example_campaign_manager.py
index 4c32391fb0..bd02ba3bc5 100644
--- a/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py
+++ b/tests/system/providers/google/marketing_platform/example_campaign_manager.py
@@ -23,6 +23,7 @@ import time
 from datetime import datetime
 
 from airflow import models
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.marketing_platform.operators.campaign_manager import (
     GoogleCampaignManagerBatchInsertConversionsOperator,
     GoogleCampaignManagerBatchUpdateConversionsOperator,
@@ -34,14 +35,20 @@ from airflow.providers.google.marketing_platform.operators.campaign_manager impo
 from airflow.providers.google.marketing_platform.sensors.campaign_manager import (
     GoogleCampaignManagerReportSensor,
 )
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_campaign_manager"
 
 PROFILE_ID = os.environ.get("MARKETING_PROFILE_ID", "123456789")
 FLOODLIGHT_ACTIVITY_ID = int(os.environ.get("FLOODLIGHT_ACTIVITY_ID", 12345))
 FLOODLIGHT_CONFIGURATION_ID = int(os.environ.get("FLOODLIGHT_CONFIGURATION_ID", 12345))
 ENCRYPTION_ENTITY_ID = int(os.environ.get("ENCRYPTION_ENTITY_ID", 12345))
 DEVICE_ID = os.environ.get("DEVICE_ID", "12345")
-BUCKET = os.environ.get("MARKETING_BUCKET", "test-cm-bucket")
-REPORT_NAME = "test-report"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+REPORT_NAME = f"report_{DAG_ID}_{ENV_ID}"
 REPORT = {
     "type": "STANDARD",
     "name": REPORT_NAME,
@@ -84,11 +91,16 @@ CONVERSION_UPDATE = {
 }
 
 with models.DAG(
-    "example_campaign_manager",
+    DAG_ID,
     schedule_interval='@once',  # Override to match your needs,
     start_date=datetime(2021, 1, 1),
     catchup=False,
+    tags=["example", "campaign"],
 ) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+    )
+
     # [START howto_campaign_manager_insert_report_operator]
     create_report = GoogleCampaignManagerInsertReportOperator(
         profile_id=PROFILE_ID, report=REPORT, task_id="create_report"
@@ -119,24 +131,22 @@ with models.DAG(
         report_id=report_id,
         file_id=file_id,
         report_name="test_report.csv",
-        bucket_name=BUCKET,
+        bucket_name=BUCKET_NAME,
     )
     # [END howto_campaign_manager_get_report_operator]
 
     # [START howto_campaign_manager_delete_report_operator]
     delete_report = GoogleCampaignManagerDeleteReportOperator(
-        profile_id=PROFILE_ID, report_name=REPORT_NAME, task_id="delete_report"
+        profile_id=PROFILE_ID,
+        report_name=REPORT_NAME,
+        task_id="delete_report",
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_campaign_manager_delete_report_operator]
 
-    wait_for_report >> get_report >> delete_report
-
-    # Task dependencies created via `XComArgs`:
-    #   create_report >> run_report
-    #   create_report >> wait_for_report
-    #   create_report >> get_report
-    #   run_report >> get_report
-    #   run_report >> wait_for_report
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
 
     # [START howto_campaign_manager_insert_conversions]
     insert_conversion = GoogleCampaignManagerBatchInsertConversionsOperator(
@@ -161,9 +171,28 @@ with models.DAG(
     )
     # [END howto_campaign_manager_update_conversions]
 
-    insert_conversion >> update_conversion
+    (
+        # TEST SETUP
+        create_bucket
+        >> create_report
+        # TEST BODY
+        >> run_report
+        >> wait_for_report
+        >> get_report
+        >> insert_conversion
+        >> update_conversion
+        # TEST TEARDOWN
+        >> delete_report
+        >> delete_bucket
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
 
+from tests.system.utils import get_test_run  # noqa: E402
 
-if __name__ == "__main__":
-    dag.clear()
-    dag.run()
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)