You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/28 21:34:52 UTC

[airflow] branch main updated: Move `bucket_name` validation out of `__init__` in Google Marketing Platform operators (#19383)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fb478c0  Move `bucket_name` validation out of `__init__` in Google Marketing Platform operators (#19383)
fb478c0 is described below

commit fb478c00cdc5e78d5e85fe5ac103707c829be2fb
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Sun Nov 28 16:34:22 2021 -0500

    Move `bucket_name` validation out of `__init__` in Google Marketing Platform operators (#19383)
---
 .../operators/campaign_manager.py                  |   5 +-
 .../marketing_platform/operators/display_video.py  |   5 +-
 .../marketing_platform/operators/search_ads.py     |   5 +-
 .../operators/test_campaign_manager.py             | 153 ++++++++++++------
 .../operators/test_display_video.py                | 174 ++++++++++++++-------
 .../operators/test_search_ads.py                   |  90 +++++++++--
 6 files changed, 307 insertions(+), 125 deletions(-)

diff --git a/airflow/providers/google/marketing_platform/operators/campaign_manager.py b/airflow/providers/google/marketing_platform/operators/campaign_manager.py
index ccca975..b44196e 100644
--- a/airflow/providers/google/marketing_platform/operators/campaign_manager.py
+++ b/airflow/providers/google/marketing_platform/operators/campaign_manager.py
@@ -204,7 +204,7 @@ class GoogleCampaignManagerDownloadReportOperator(BaseOperator):
         self.api_version = api_version
         self.chunk_size = chunk_size
         self.gzip = gzip
-        self.bucket_name = self._set_bucket_name(bucket_name)
+        self.bucket_name = bucket_name
         self.report_name = report_name
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
@@ -254,8 +254,9 @@ class GoogleCampaignManagerDownloadReportOperator(BaseOperator):
 
             temp_file.flush()
             # Upload the local file to bucket
+            bucket_name = self._set_bucket_name(self.bucket_name)
             gcs_hook.upload(
-                bucket_name=self.bucket_name,
+                bucket_name=bucket_name,
                 object_name=report_name,
                 gzip=self.gzip,
                 filename=temp_file.name,
diff --git a/airflow/providers/google/marketing_platform/operators/display_video.py b/airflow/providers/google/marketing_platform/operators/display_video.py
index 307df1b..6799f47 100644
--- a/airflow/providers/google/marketing_platform/operators/display_video.py
+++ b/airflow/providers/google/marketing_platform/operators/display_video.py
@@ -261,7 +261,7 @@ class GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
         self.report_id = report_id
         self.chunk_size = chunk_size
         self.gzip = gzip
-        self.bucket_name = self._set_bucket_name(bucket_name)
+        self.bucket_name = bucket_name
         self.report_name = report_name
         self.api_version = api_version
         self.gcp_conn_id = gcp_conn_id
@@ -309,8 +309,9 @@ class GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
 
             temp_file.flush()
             # Upload the local file to bucket
+            bucket_name = self._set_bucket_name(self.bucket_name)
             gcs_hook.upload(
-                bucket_name=self.bucket_name,
+                bucket_name=bucket_name,
                 object_name=report_name,
                 gzip=self.gzip,
                 filename=temp_file.name,
diff --git a/airflow/providers/google/marketing_platform/operators/search_ads.py b/airflow/providers/google/marketing_platform/operators/search_ads.py
index 77f790f..8e904d1 100644
--- a/airflow/providers/google/marketing_platform/operators/search_ads.py
+++ b/airflow/providers/google/marketing_platform/operators/search_ads.py
@@ -172,7 +172,7 @@ class GoogleSearchAdsDownloadReportOperator(BaseOperator):
         self.report_id = report_id
         self.chunk_size = chunk_size
         self.gzip = gzip
-        self.bucket_name = self._set_bucket_name(bucket_name)
+        self.bucket_name = bucket_name
         self.report_name = report_name
         self.impersonation_chain = impersonation_chain
 
@@ -232,8 +232,9 @@ class GoogleSearchAdsDownloadReportOperator(BaseOperator):
 
             temp_file.flush()
 
+            bucket_name = self._set_bucket_name(self.bucket_name)
             gcs_hook.upload(
-                bucket_name=self.bucket_name,
+                bucket_name=bucket_name,
                 object_name=report_name,
                 gzip=self.gzip,
                 filename=temp_file.name,
diff --git a/tests/providers/google/marketing_platform/operators/test_campaign_manager.py b/tests/providers/google/marketing_platform/operators/test_campaign_manager.py
index 49eb3ac..5f57567 100644
--- a/tests/providers/google/marketing_platform/operators/test_campaign_manager.py
+++ b/tests/providers/google/marketing_platform/operators/test_campaign_manager.py
@@ -19,6 +19,9 @@ import json
 from tempfile import NamedTemporaryFile
 from unittest import TestCase, mock
 
+from parameterized import parameterized
+
+from airflow.models import DAG, TaskInstance as TI
 from airflow.providers.google.marketing_platform.operators.campaign_manager import (
     GoogleCampaignManagerBatchInsertConversionsOperator,
     GoogleCampaignManagerBatchUpdateConversionsOperator,
@@ -27,6 +30,8 @@ from airflow.providers.google.marketing_platform.operators.campaign_manager impo
     GoogleCampaignManagerInsertReportOperator,
     GoogleCampaignManagerRunReportOperator,
 )
+from airflow.utils import timezone
+from airflow.utils.session import create_session
 
 API_VERSION = "api_version"
 GCP_CONN_ID = "google_cloud_default"
@@ -46,6 +51,14 @@ CONVERSION = {
     ],
 }
 
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+PROFILE_ID = "profile_id"
+REPORT_ID = "report_id"
+FILE_ID = "file_id"
+BUCKET_NAME = "test_bucket"
+REPORT_NAME = "test_report.csv"
+TEMP_FILE_NAME = "test"
+
 
 class TestGoogleCampaignManagerDeleteReportOperator(TestCase):
     @mock.patch(
@@ -53,11 +66,9 @@ class TestGoogleCampaignManagerDeleteReportOperator(TestCase):
     )
     @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
     def test_execute(self, mock_base_op, hook_mock):
-        profile_id = "PROFILE_ID"
-        report_id = "REPORT_ID"
         op = GoogleCampaignManagerDeleteReportOperator(
-            profile_id=profile_id,
-            report_id=report_id,
+            profile_id=PROFILE_ID,
+            report_id=REPORT_ID,
             api_version=API_VERSION,
             task_id="test_task",
         )
@@ -69,11 +80,19 @@ class TestGoogleCampaignManagerDeleteReportOperator(TestCase):
             impersonation_chain=None,
         )
         hook_mock.return_value.delete_report.assert_called_once_with(
-            profile_id=profile_id, report_id=report_id
+            profile_id=PROFILE_ID, report_id=REPORT_ID
         )
 
 
-class TestGoogleCampaignManagerGetReportOperator(TestCase):
+class TestGoogleCampaignManagerDownloadReportOperator(TestCase):
+    def setUp(self):
+        with create_session() as session:
+            session.query(TI).delete()
+
+    def tearDown(self):
+        with create_session() as session:
+            session.query(TI).delete()
+
     @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.http")
     @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile")
     @mock.patch(
@@ -94,24 +113,17 @@ class TestGoogleCampaignManagerGetReportOperator(TestCase):
         tempfile_mock,
         http_mock,
     ):
-        profile_id = "PROFILE_ID"
-        report_id = "REPORT_ID"
-        file_id = "FILE_ID"
-        bucket_name = "test_bucket"
-        report_name = "test_report.csv"
-        temp_file_name = "TEST"
-
         http_mock.MediaIoBaseDownload.return_value.next_chunk.return_value = (
             None,
             True,
         )
-        tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = temp_file_name
+        tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = TEMP_FILE_NAME
         op = GoogleCampaignManagerDownloadReportOperator(
-            profile_id=profile_id,
-            report_id=report_id,
-            file_id=file_id,
-            bucket_name=bucket_name,
-            report_name=report_name,
+            profile_id=PROFILE_ID,
+            report_id=REPORT_ID,
+            file_id=FILE_ID,
+            bucket_name=BUCKET_NAME,
+            report_name=REPORT_NAME,
             api_version=API_VERSION,
             task_id="test_task",
         )
@@ -123,7 +135,7 @@ class TestGoogleCampaignManagerGetReportOperator(TestCase):
             impersonation_chain=None,
         )
         hook_mock.return_value.get_report_file.assert_called_once_with(
-            profile_id=profile_id, report_id=report_id, file_id=file_id
+            profile_id=PROFILE_ID, report_id=REPORT_ID, file_id=FILE_ID
         )
         gcs_hook_mock.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -131,13 +143,70 @@ class TestGoogleCampaignManagerGetReportOperator(TestCase):
             impersonation_chain=None,
         )
         gcs_hook_mock.return_value.upload.assert_called_once_with(
-            bucket_name=bucket_name,
-            object_name=report_name + ".gz",
+            bucket_name=BUCKET_NAME,
+            object_name=REPORT_NAME + ".gz",
+            gzip=True,
+            filename=TEMP_FILE_NAME,
+            mime_type="text/csv",
+        )
+        xcom_mock.assert_called_once_with(None, key="report_name", value=REPORT_NAME + ".gz")
+
+    @parameterized.expand([BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ ti.xcom_pull(task_ids='f') }}"])
+    @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.http")
+    @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile")
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
+    )
+    @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.GCSHook")
+    def test_set_bucket_name(
+        self,
+        test_bucket_name,
+        gcs_hook_mock,
+        hook_mock,
+        tempfile_mock,
+        http_mock,
+    ):
+        http_mock.MediaIoBaseDownload.return_value.next_chunk.return_value = (
+            None,
+            True,
+        )
+        tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = TEMP_FILE_NAME
+
+        dag = DAG(
+            dag_id="test_set_bucket_name",
+            start_date=DEFAULT_DATE,
+            schedule_interval=None,
+            catchup=False,
+        )
+
+        if BUCKET_NAME not in test_bucket_name:
+
+            @dag.task
+            def f():
+                return BUCKET_NAME
+
+            taskflow_op = f()
+            taskflow_op.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        op = GoogleCampaignManagerDownloadReportOperator(
+            profile_id=PROFILE_ID,
+            report_id=REPORT_ID,
+            file_id=FILE_ID,
+            bucket_name=test_bucket_name if test_bucket_name != "XComArg" else taskflow_op,
+            report_name=REPORT_NAME,
+            api_version=API_VERSION,
+            task_id="test_task",
+            dag=dag,
+        )
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        gcs_hook_mock.return_value.upload.assert_called_once_with(
+            bucket_name=BUCKET_NAME,
+            object_name=REPORT_NAME + ".gz",
             gzip=True,
-            filename=temp_file_name,
+            filename=TEMP_FILE_NAME,
             mime_type="text/csv",
         )
-        xcom_mock.assert_called_once_with(None, key="report_name", value=report_name + ".gz")
 
 
 class TestGoogleCampaignManagerInsertReportOperator(TestCase):
@@ -150,14 +219,12 @@ class TestGoogleCampaignManagerInsertReportOperator(TestCase):
         "campaign_manager.GoogleCampaignManagerInsertReportOperator.xcom_push"
     )
     def test_execute(self, xcom_mock, mock_base_op, hook_mock):
-        profile_id = "PROFILE_ID"
         report = {"report": "test"}
-        report_id = "test"
 
-        hook_mock.return_value.insert_report.return_value = {"id": report_id}
+        hook_mock.return_value.insert_report.return_value = {"id": REPORT_ID}
 
         op = GoogleCampaignManagerInsertReportOperator(
-            profile_id=profile_id,
+            profile_id=PROFILE_ID,
             report=report,
             api_version=API_VERSION,
             task_id="test_task",
@@ -169,17 +236,16 @@ class TestGoogleCampaignManagerInsertReportOperator(TestCase):
             api_version=API_VERSION,
             impersonation_chain=None,
         )
-        hook_mock.return_value.insert_report.assert_called_once_with(profile_id=profile_id, report=report)
-        xcom_mock.assert_called_once_with(None, key="report_id", value=report_id)
+        hook_mock.return_value.insert_report.assert_called_once_with(profile_id=PROFILE_ID, report=report)
+        xcom_mock.assert_called_once_with(None, key="report_id", value=REPORT_ID)
 
     def test_prepare_template(self):
-        profile_id = "PROFILE_ID"
         report = {"key": "value"}
         with NamedTemporaryFile("w+", suffix=".json") as f:
             f.write(json.dumps(report))
             f.flush()
             op = GoogleCampaignManagerInsertReportOperator(
-                profile_id=profile_id,
+                profile_id=PROFILE_ID,
                 report=f.name,
                 api_version=API_VERSION,
                 task_id="test_task",
@@ -200,16 +266,13 @@ class TestGoogleCampaignManagerRunReportOperator(TestCase):
         "campaign_manager.GoogleCampaignManagerRunReportOperator.xcom_push"
     )
     def test_execute(self, xcom_mock, mock_base_op, hook_mock):
-        profile_id = "PROFILE_ID"
-        report_id = "REPORT_ID"
-        file_id = "FILE_ID"
         synchronous = True
 
-        hook_mock.return_value.run_report.return_value = {"id": file_id}
+        hook_mock.return_value.run_report.return_value = {"id": FILE_ID}
 
         op = GoogleCampaignManagerRunReportOperator(
-            profile_id=profile_id,
-            report_id=report_id,
+            profile_id=PROFILE_ID,
+            report_id=REPORT_ID,
             synchronous=synchronous,
             api_version=API_VERSION,
             task_id="test_task",
@@ -222,9 +285,9 @@ class TestGoogleCampaignManagerRunReportOperator(TestCase):
             impersonation_chain=None,
         )
         hook_mock.return_value.run_report.assert_called_once_with(
-            profile_id=profile_id, report_id=report_id, synchronous=synchronous
+            profile_id=PROFILE_ID, report_id=REPORT_ID, synchronous=synchronous
         )
-        xcom_mock.assert_called_once_with(None, key="file_id", value=file_id)
+        xcom_mock.assert_called_once_with(None, key="file_id", value=FILE_ID)
 
 
 class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
@@ -233,10 +296,9 @@ class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
     )
     @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
     def test_execute(self, mock_base_op, hook_mock):
-        profile_id = "PROFILE_ID"
         op = GoogleCampaignManagerBatchInsertConversionsOperator(
             task_id="insert_conversion",
-            profile_id=profile_id,
+            profile_id=PROFILE_ID,
             conversions=[CONVERSION],
             encryption_source="AD_SERVING",
             encryption_entity_type="DCM_ADVERTISER",
@@ -244,7 +306,7 @@ class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
         )
         op.execute(None)
         hook_mock.return_value.conversions_batch_insert.assert_called_once_with(
-            profile_id=profile_id,
+            profile_id=PROFILE_ID,
             conversions=[CONVERSION],
             encryption_source="AD_SERVING",
             encryption_entity_type="DCM_ADVERTISER",
@@ -259,10 +321,9 @@ class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase):
     )
     @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
     def test_execute(self, mock_base_op, hook_mock):
-        profile_id = "PROFILE_ID"
         op = GoogleCampaignManagerBatchUpdateConversionsOperator(
             task_id="update_conversion",
-            profile_id=profile_id,
+            profile_id=PROFILE_ID,
             conversions=[CONVERSION],
             encryption_source="AD_SERVING",
             encryption_entity_type="DCM_ADVERTISER",
@@ -270,7 +331,7 @@ class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase):
         )
         op.execute(None)
         hook_mock.return_value.conversions_batch_update.assert_called_once_with(
-            profile_id=profile_id,
+            profile_id=PROFILE_ID,
             conversions=[CONVERSION],
             encryption_source="AD_SERVING",
             encryption_entity_type="DCM_ADVERTISER",
diff --git a/tests/providers/google/marketing_platform/operators/test_display_video.py b/tests/providers/google/marketing_platform/operators/test_display_video.py
index eb00b9d..2f1842d 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video.py
@@ -20,6 +20,9 @@ from tempfile import NamedTemporaryFile
 from typing import Optional
 from unittest import TestCase, mock
 
+from parameterized import parameterized
+
+from airflow.models import DAG, TaskInstance as TI
 from airflow.providers.google.marketing_platform.operators.display_video import (
     GoogleDisplayVideo360CreateReportOperator,
     GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
@@ -30,12 +33,21 @@ from airflow.providers.google.marketing_platform.operators.display_video import
     GoogleDisplayVideo360SDFtoGCSOperator,
     GoogleDisplayVideo360UploadLineItemsOperator,
 )
+from airflow.utils import timezone
+from airflow.utils.session import create_session
 
 API_VERSION = "api_version"
 GCP_CONN_ID = "google_cloud_default"
 DELEGATE_TO: Optional[str] = None
 IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
 
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+REPORT_ID = "report_id"
+BUCKET_NAME = "test_bucket"
+REPORT_NAME = "test_report.csv"
+QUERY_ID = FILENAME = "test"
+OBJECT_NAME = "object_name"
+
 
 class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
     @mock.patch(
@@ -45,11 +57,9 @@ class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
     @mock.patch(
         "airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
     )
-    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
-    def test_execute(self, mock_base_op, hook_mock, xcom_mock):
+    def test_execute(self, hook_mock, xcom_mock):
         body = {"body": "test"}
-        query_id = "TEST"
-        hook_mock.return_value.create_query.return_value = {"queryId": query_id}
+        hook_mock.return_value.create_query.return_value = {"queryId": QUERY_ID}
         op = GoogleDisplayVideo360CreateReportOperator(
             body=body, api_version=API_VERSION, task_id="test_task"
         )
@@ -61,7 +71,7 @@ class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
             impersonation_chain=None,
         )
         hook_mock.return_value.create_query.assert_called_once_with(query=body)
-        xcom_mock.assert_called_once_with(None, key="report_id", value=query_id)
+        xcom_mock.assert_called_once_with(None, key="report_id", value=QUERY_ID)
 
     def test_prepare_template(self):
         body = {"key": "value"}
@@ -81,11 +91,9 @@ class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
     @mock.patch(
         "airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
     )
-    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
-    def test_execute(self, mock_base_op, hook_mock):
-        query_id = "QUERY_ID"
+    def test_execute(self, hook_mock):
         op = GoogleDisplayVideo360DeleteReportOperator(
-            report_id=query_id, api_version=API_VERSION, task_id="test_task"
+            report_id=QUERY_ID, api_version=API_VERSION, task_id="test_task"
         )
         op.execute(context=None)
         hook_mock.assert_called_once_with(
@@ -94,10 +102,18 @@ class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
             api_version=API_VERSION,
             impersonation_chain=None,
         )
-        hook_mock.return_value.delete_query.assert_called_once_with(query_id=query_id)
+        hook_mock.return_value.delete_query.assert_called_once_with(query_id=QUERY_ID)
+
 
+class TestGoogleDisplayVideo360DownloadReportOperator(TestCase):
+    def setUp(self):
+        with create_session() as session:
+            session.query(TI).delete()
+
+    def tearDown(self):
+        with create_session() as session:
+            session.query(TI).delete()
 
-class TestGoogleDisplayVideo360GetReportOperator(TestCase):
     @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
     @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
     @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
@@ -109,10 +125,8 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
     @mock.patch(
         "airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
     )
-    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
     def test_execute(
         self,
-        mock_base_op,
         mock_hook,
         mock_gcs_hook,
         mock_xcom,
@@ -120,11 +134,7 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
         mock_request,
         mock_shutil,
     ):
-        report_id = "REPORT_ID"
-        bucket_name = "BUCKET"
-        report_name = "TEST.csv"
-        filename = "test"
-        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
         mock_hook.return_value.get_query.return_value = {
             "metadata": {
                 "running": False,
@@ -132,10 +142,10 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
             }
         }
         op = GoogleDisplayVideo360DownloadReportOperator(
-            report_id=report_id,
+            report_id=REPORT_ID,
             api_version=API_VERSION,
-            bucket_name=bucket_name,
-            report_name=report_name,
+            bucket_name=BUCKET_NAME,
+            report_name=REPORT_NAME,
             task_id="test_task",
         )
         op.execute(context=None)
@@ -145,7 +155,7 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
             api_version=API_VERSION,
             impersonation_chain=None,
         )
-        mock_hook.return_value.get_query.assert_called_once_with(query_id=report_id)
+        mock_hook.return_value.get_query.assert_called_once_with(query_id=REPORT_ID)
 
         mock_gcs_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -153,25 +163,82 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
             impersonation_chain=None,
         )
         mock_gcs_hook.return_value.upload.assert_called_once_with(
-            bucket_name=bucket_name,
-            filename=filename,
+            bucket_name=BUCKET_NAME,
+            filename=FILENAME,
+            gzip=True,
+            mime_type="text/csv",
+            object_name=REPORT_NAME + ".gz",
+        )
+        mock_xcom.assert_called_once_with(None, key="report_name", value=REPORT_NAME + ".gz")
+
+    @parameterized.expand([BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ ti.xcom_pull(task_ids='f') }}"])
+    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
+    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
+    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
+    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+    )
+    def test_set_bucket_name(
+        self,
+        test_bucket_name,
+        mock_hook,
+        mock_gcs_hook,
+        mock_temp,
+        mock_request,
+        mock_shutil,
+    ):
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
+        mock_hook.return_value.get_query.return_value = {
+            "metadata": {
+                "running": False,
+                "googleCloudStoragePathForLatestReport": "test",
+            }
+        }
+
+        dag = DAG(
+            dag_id="test_set_bucket_name",
+            start_date=DEFAULT_DATE,
+            schedule_interval=None,
+            catchup=False,
+        )
+
+        if BUCKET_NAME not in test_bucket_name:
+
+            @dag.task
+            def f():
+                return BUCKET_NAME
+
+            taskflow_op = f()
+            taskflow_op.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        op = GoogleDisplayVideo360DownloadReportOperator(
+            report_id=REPORT_ID,
+            api_version=API_VERSION,
+            bucket_name=test_bucket_name if test_bucket_name != "XComArg" else taskflow_op,
+            report_name=REPORT_NAME,
+            task_id="test_task",
+            dag=dag,
+        )
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        mock_gcs_hook.return_value.upload.assert_called_once_with(
+            bucket_name=BUCKET_NAME,
+            filename=FILENAME,
             gzip=True,
             mime_type="text/csv",
-            object_name=report_name + ".gz",
+            object_name=REPORT_NAME + ".gz",
         )
-        mock_xcom.assert_called_once_with(None, key="report_name", value=report_name + ".gz")
 
 
 class TestGoogleDisplayVideo360RunReportOperator(TestCase):
     @mock.patch(
         "airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
     )
-    @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
-    def test_execute(self, mock_base_op, hook_mock):
-        report_id = "QUERY_ID"
+    def test_execute(self, hook_mock):
         parameters = {"param": "test"}
         op = GoogleDisplayVideo360RunReportOperator(
-            report_id=report_id,
+            report_id=REPORT_ID,
             parameters=parameters,
             api_version=API_VERSION,
             task_id="test_task",
@@ -183,7 +250,7 @@ class TestGoogleDisplayVideo360RunReportOperator(TestCase):
             api_version=API_VERSION,
             impersonation_chain=None,
         )
-        hook_mock.return_value.run_query.assert_called_once_with(query_id=report_id, params=parameters)
+        hook_mock.return_value.run_query.assert_called_once_with(query_id=REPORT_ID, params=parameters)
 
 
 class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
@@ -199,16 +266,13 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
             "format": "format",
             "fileSpec": "file_spec",
         }
-        bucket_name = "bucket_name"
-        object_name = "object_name"
-        filename = "test"
-        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
         gzip = False
 
         op = GoogleDisplayVideo360DownloadLineItemsOperator(
             request_body=request_body,
-            bucket_name=bucket_name,
-            object_name=object_name,
+            bucket_name=BUCKET_NAME,
+            object_name=OBJECT_NAME,
             gzip=gzip,
             api_version=API_VERSION,
             gcp_conn_id=GCP_CONN_ID,
@@ -220,9 +284,9 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
         op.execute(context=None)
 
         gcs_hook_mock.return_value.upload.assert_called_with(
-            bucket_name=bucket_name,
-            object_name=object_name,
-            filename=filename,
+            bucket_name=BUCKET_NAME,
+            object_name=OBJECT_NAME,
+            filename=FILENAME,
             gzip=gzip,
             mime_type="text/csv",
         )
@@ -248,16 +312,13 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
     )
     @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
     def test_execute(self, gcs_hook_mock, hook_mock, mock_tempfile):
-        filename = "filename"
-        object_name = "object_name"
-        bucket_name = "bucket_name"
         line_items = "holy_hand_grenade"
         gcs_hook_mock.return_value.download.return_value = line_items
-        mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+        mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
 
         op = GoogleDisplayVideo360UploadLineItemsOperator(
-            bucket_name=bucket_name,
-            object_name=object_name,
+            bucket_name=BUCKET_NAME,
+            object_name=OBJECT_NAME,
             api_version=API_VERSION,
             gcp_conn_id=GCP_CONN_ID,
             task_id="test_task",
@@ -277,9 +338,9 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
         )
 
         gcs_hook_mock.return_value.download.assert_called_once_with(
-            bucket_name=bucket_name,
-            object_name=object_name,
-            filename=filename,
+            bucket_name=BUCKET_NAME,
+            object_name=OBJECT_NAME,
+            filename=FILENAME,
         )
         hook_mock.return_value.upload_line_items.assert_called_once()
         hook_mock.return_value.upload_line_items.assert_called_once_with(line_items=line_items)
@@ -294,19 +355,16 @@ class TestGoogleDisplayVideo360SDFtoGCSOperator(TestCase):
     def test_execute(self, mock_temp, gcs_mock_hook, mock_hook):
         operation_name = "operation_name"
         operation = {"key": "value"}
-        bucket_name = "bucket_name"
-        object_name = "object_name"
-        filename = "filename"
         gzip = False
 
         # mock_hook.return_value.create_sdf_download_operation.return_value = response_name
         mock_hook.return_value.get_sdf_download_operation.return_value = operation
-        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
 
         op = GoogleDisplayVideo360SDFtoGCSOperator(
             operation_name=operation_name,
-            bucket_name=bucket_name,
-            object_name=object_name,
+            bucket_name=BUCKET_NAME,
+            object_name=OBJECT_NAME,
             gzip=gzip,
             api_version=API_VERSION,
             gcp_conn_id=GCP_CONN_ID,
@@ -348,9 +406,9 @@ class TestGoogleDisplayVideo360SDFtoGCSOperator(TestCase):
 
         gcs_mock_hook.return_value.upload.assert_called_once()
         gcs_mock_hook.return_value.upload.assert_called_once_with(
-            bucket_name=bucket_name,
-            object_name=object_name,
-            filename=filename,
+            bucket_name=BUCKET_NAME,
+            object_name=OBJECT_NAME,
+            filename=FILENAME,
             gzip=gzip,
         )
 
diff --git a/tests/providers/google/marketing_platform/operators/test_search_ads.py b/tests/providers/google/marketing_platform/operators/test_search_ads.py
index cc2ed7d..068d2b0 100644
--- a/tests/providers/google/marketing_platform/operators/test_search_ads.py
+++ b/tests/providers/google/marketing_platform/operators/test_search_ads.py
@@ -19,14 +19,26 @@ import json
 from tempfile import NamedTemporaryFile
 from unittest import TestCase, mock
 
+from parameterized import parameterized
+
+from airflow.models import DAG, TaskInstance as TI
 from airflow.providers.google.marketing_platform.operators.search_ads import (
     GoogleSearchAdsDownloadReportOperator,
     GoogleSearchAdsInsertReportOperator,
 )
+from airflow.utils import timezone
+from airflow.utils.session import create_session
 
 API_VERSION = "api_version"
 GCP_CONN_ID = "google_cloud_default"
 
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+END_DATE = timezone.datetime(2021, 1, 2)
+REPORT_ID = "report_id"
+BUCKET_NAME = "test_bucket"
+REPORT_NAME = "test_report.csv"
+FILE_NAME = "test"
+
 
 class TestGoogleSearchAdsInsertReportOperator(TestCase):
     @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
@@ -37,8 +49,7 @@ class TestGoogleSearchAdsInsertReportOperator(TestCase):
     )
     def test_execute(self, xcom_mock, mock_base_op, hook_mock):
         report = {"report": "test"}
-        report_id = "TEST"
-        hook_mock.return_value.insert_report.return_value = {"id": report_id}
+        hook_mock.return_value.insert_report.return_value = {"id": REPORT_ID}
         op = GoogleSearchAdsInsertReportOperator(report=report, api_version=API_VERSION, task_id="test_task")
         op.execute(context=None)
         hook_mock.assert_called_once_with(
@@ -48,7 +59,7 @@ class TestGoogleSearchAdsInsertReportOperator(TestCase):
             impersonation_chain=None,
         )
         hook_mock.return_value.insert_report.assert_called_once_with(report=report)
-        xcom_mock.assert_called_once_with(None, key="report_id", value=report_id)
+        xcom_mock.assert_called_once_with(None, key="report_id", value=REPORT_ID)
 
     def test_prepare_template(self):
         report = {"key": "value"}
@@ -65,19 +76,23 @@ class TestGoogleSearchAdsInsertReportOperator(TestCase):
 
 
 class TestGoogleSearchAdsDownloadReportOperator(TestCase):
+    def setUp(self):
+        with create_session() as session:
+            session.query(TI).delete()
+
+    def tearDown(self):
+        with create_session() as session:
+            session.query(TI).delete()
+
     @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
     @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
     @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
-    @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
     @mock.patch(
         "airflow.providers.google.marketing_platform."
         "operators.search_ads.GoogleSearchAdsDownloadReportOperator.xcom_push"
     )
-    def test_execute(self, xcom_mock, mock_base_op, hook_mock, gcs_hook_mock, tempfile_mock):
-        report_id = "REPORT_ID"
-        file_name = "TEST"
+    def test_execute(self, xcom_mock, hook_mock, gcs_hook_mock, tempfile_mock):
         temp_file_name = "TEMP"
-        bucket_name = "test"
         data = b"data"
 
         hook_mock.return_value.get.return_value = {"files": [0], "isReportReady": True}
@@ -85,9 +100,9 @@ class TestGoogleSearchAdsDownloadReportOperator(TestCase):
         tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
 
         op = GoogleSearchAdsDownloadReportOperator(
-            report_id=report_id,
-            report_name=file_name,
-            bucket_name=bucket_name,
+            report_id=REPORT_ID,
+            report_name=FILE_NAME,
+            bucket_name=BUCKET_NAME,
             api_version=API_VERSION,
             task_id="test_task",
         )
@@ -98,12 +113,57 @@ class TestGoogleSearchAdsDownloadReportOperator(TestCase):
             api_version=API_VERSION,
             impersonation_chain=None,
         )
-        hook_mock.return_value.get_file.assert_called_once_with(report_fragment=0, report_id=report_id)
+        hook_mock.return_value.get_file.assert_called_once_with(report_fragment=0, report_id=REPORT_ID)
         tempfile_mock.return_value.__enter__.return_value.write.assert_called_once_with(data)
         gcs_hook_mock.return_value.upload.assert_called_once_with(
-            bucket_name=bucket_name,
+            bucket_name=BUCKET_NAME,
+            gzip=True,
+            object_name=FILE_NAME + ".csv.gz",
+            filename=temp_file_name,
+        )
+        xcom_mock.assert_called_once_with(None, key="file_name", value=FILE_NAME + ".csv.gz")
+
+    @parameterized.expand([BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ ti.xcom_pull(task_ids='f') }}"])
+    @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
+    @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
+    @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
+    def test_set_bucket_name(self, test_bucket_name, hook_mock, gcs_hook_mock, tempfile_mock):
+        temp_file_name = "TEMP"
+        data = b"data"
+
+        hook_mock.return_value.get.return_value = {"files": [0], "isReportReady": True}
+        hook_mock.return_value.get_file.return_value = data
+        tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
+
+        dag = DAG(
+            dag_id="test_set_bucket_name",
+            start_date=DEFAULT_DATE,
+            schedule_interval=None,
+            catchup=False,
+        )
+
+        if BUCKET_NAME not in test_bucket_name:
+
+            @dag.task
+            def f():
+                return BUCKET_NAME
+
+            taskflow_op = f()
+            taskflow_op.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        op = GoogleSearchAdsDownloadReportOperator(
+            report_id=REPORT_ID,
+            report_name=FILE_NAME,
+            bucket_name=test_bucket_name if test_bucket_name != "XComArg" else taskflow_op,
+            api_version=API_VERSION,
+            task_id="test_task",
+            dag=dag,
+        )
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        gcs_hook_mock.return_value.upload.assert_called_once_with(
+            bucket_name=BUCKET_NAME,
             gzip=True,
-            object_name=file_name + ".csv.gz",
+            object_name=FILE_NAME + ".csv.gz",
             filename=temp_file_name,
         )
-        xcom_mock.assert_called_once_with(None, key="file_name", value=file_name + ".csv.gz")