You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/28 21:34:52 UTC
[airflow] branch main updated: Move `bucket_name` validation out of `__init__` in Google Marketing Platform operators (#19383)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fb478c0 Move `bucket_name` validation out of `__init__` in Google Marketing Platform operators (#19383)
fb478c0 is described below
commit fb478c00cdc5e78d5e85fe5ac103707c829be2fb
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Sun Nov 28 16:34:22 2021 -0500
Move `bucket_name` validation out of `__init__` in Google Marketing Platform operators (#19383)
---
.../operators/campaign_manager.py | 5 +-
.../marketing_platform/operators/display_video.py | 5 +-
.../marketing_platform/operators/search_ads.py | 5 +-
.../operators/test_campaign_manager.py | 153 ++++++++++++------
.../operators/test_display_video.py | 174 ++++++++++++++-------
.../operators/test_search_ads.py | 90 +++++++++--
6 files changed, 307 insertions(+), 125 deletions(-)
diff --git a/airflow/providers/google/marketing_platform/operators/campaign_manager.py b/airflow/providers/google/marketing_platform/operators/campaign_manager.py
index ccca975..b44196e 100644
--- a/airflow/providers/google/marketing_platform/operators/campaign_manager.py
+++ b/airflow/providers/google/marketing_platform/operators/campaign_manager.py
@@ -204,7 +204,7 @@ class GoogleCampaignManagerDownloadReportOperator(BaseOperator):
self.api_version = api_version
self.chunk_size = chunk_size
self.gzip = gzip
- self.bucket_name = self._set_bucket_name(bucket_name)
+ self.bucket_name = bucket_name
self.report_name = report_name
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -254,8 +254,9 @@ class GoogleCampaignManagerDownloadReportOperator(BaseOperator):
temp_file.flush()
# Upload the local file to bucket
+ bucket_name = self._set_bucket_name(self.bucket_name)
gcs_hook.upload(
- bucket_name=self.bucket_name,
+ bucket_name=bucket_name,
object_name=report_name,
gzip=self.gzip,
filename=temp_file.name,
diff --git a/airflow/providers/google/marketing_platform/operators/display_video.py b/airflow/providers/google/marketing_platform/operators/display_video.py
index 307df1b..6799f47 100644
--- a/airflow/providers/google/marketing_platform/operators/display_video.py
+++ b/airflow/providers/google/marketing_platform/operators/display_video.py
@@ -261,7 +261,7 @@ class GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
self.report_id = report_id
self.chunk_size = chunk_size
self.gzip = gzip
- self.bucket_name = self._set_bucket_name(bucket_name)
+ self.bucket_name = bucket_name
self.report_name = report_name
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
@@ -309,8 +309,9 @@ class GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
temp_file.flush()
# Upload the local file to bucket
+ bucket_name = self._set_bucket_name(self.bucket_name)
gcs_hook.upload(
- bucket_name=self.bucket_name,
+ bucket_name=bucket_name,
object_name=report_name,
gzip=self.gzip,
filename=temp_file.name,
diff --git a/airflow/providers/google/marketing_platform/operators/search_ads.py b/airflow/providers/google/marketing_platform/operators/search_ads.py
index 77f790f..8e904d1 100644
--- a/airflow/providers/google/marketing_platform/operators/search_ads.py
+++ b/airflow/providers/google/marketing_platform/operators/search_ads.py
@@ -172,7 +172,7 @@ class GoogleSearchAdsDownloadReportOperator(BaseOperator):
self.report_id = report_id
self.chunk_size = chunk_size
self.gzip = gzip
- self.bucket_name = self._set_bucket_name(bucket_name)
+ self.bucket_name = bucket_name
self.report_name = report_name
self.impersonation_chain = impersonation_chain
@@ -232,8 +232,9 @@ class GoogleSearchAdsDownloadReportOperator(BaseOperator):
temp_file.flush()
+ bucket_name = self._set_bucket_name(self.bucket_name)
gcs_hook.upload(
- bucket_name=self.bucket_name,
+ bucket_name=bucket_name,
object_name=report_name,
gzip=self.gzip,
filename=temp_file.name,
diff --git a/tests/providers/google/marketing_platform/operators/test_campaign_manager.py b/tests/providers/google/marketing_platform/operators/test_campaign_manager.py
index 49eb3ac..5f57567 100644
--- a/tests/providers/google/marketing_platform/operators/test_campaign_manager.py
+++ b/tests/providers/google/marketing_platform/operators/test_campaign_manager.py
@@ -19,6 +19,9 @@ import json
from tempfile import NamedTemporaryFile
from unittest import TestCase, mock
+from parameterized import parameterized
+
+from airflow.models import DAG, TaskInstance as TI
from airflow.providers.google.marketing_platform.operators.campaign_manager import (
GoogleCampaignManagerBatchInsertConversionsOperator,
GoogleCampaignManagerBatchUpdateConversionsOperator,
@@ -27,6 +30,8 @@ from airflow.providers.google.marketing_platform.operators.campaign_manager impo
GoogleCampaignManagerInsertReportOperator,
GoogleCampaignManagerRunReportOperator,
)
+from airflow.utils import timezone
+from airflow.utils.session import create_session
API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"
@@ -46,6 +51,14 @@ CONVERSION = {
],
}
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+PROFILE_ID = "profile_id"
+REPORT_ID = "report_id"
+FILE_ID = "file_id"
+BUCKET_NAME = "test_bucket"
+REPORT_NAME = "test_report.csv"
+TEMP_FILE_NAME = "test"
+
class TestGoogleCampaignManagerDeleteReportOperator(TestCase):
@mock.patch(
@@ -53,11 +66,9 @@ class TestGoogleCampaignManagerDeleteReportOperator(TestCase):
)
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
def test_execute(self, mock_base_op, hook_mock):
- profile_id = "PROFILE_ID"
- report_id = "REPORT_ID"
op = GoogleCampaignManagerDeleteReportOperator(
- profile_id=profile_id,
- report_id=report_id,
+ profile_id=PROFILE_ID,
+ report_id=REPORT_ID,
api_version=API_VERSION,
task_id="test_task",
)
@@ -69,11 +80,19 @@ class TestGoogleCampaignManagerDeleteReportOperator(TestCase):
impersonation_chain=None,
)
hook_mock.return_value.delete_report.assert_called_once_with(
- profile_id=profile_id, report_id=report_id
+ profile_id=PROFILE_ID, report_id=REPORT_ID
)
-class TestGoogleCampaignManagerGetReportOperator(TestCase):
+class TestGoogleCampaignManagerDownloadReportOperator(TestCase):
+ def setUp(self):
+ with create_session() as session:
+ session.query(TI).delete()
+
+ def tearDown(self):
+ with create_session() as session:
+ session.query(TI).delete()
+
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.http")
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile")
@mock.patch(
@@ -94,24 +113,17 @@ class TestGoogleCampaignManagerGetReportOperator(TestCase):
tempfile_mock,
http_mock,
):
- profile_id = "PROFILE_ID"
- report_id = "REPORT_ID"
- file_id = "FILE_ID"
- bucket_name = "test_bucket"
- report_name = "test_report.csv"
- temp_file_name = "TEST"
-
http_mock.MediaIoBaseDownload.return_value.next_chunk.return_value = (
None,
True,
)
- tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = temp_file_name
+ tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = TEMP_FILE_NAME
op = GoogleCampaignManagerDownloadReportOperator(
- profile_id=profile_id,
- report_id=report_id,
- file_id=file_id,
- bucket_name=bucket_name,
- report_name=report_name,
+ profile_id=PROFILE_ID,
+ report_id=REPORT_ID,
+ file_id=FILE_ID,
+ bucket_name=BUCKET_NAME,
+ report_name=REPORT_NAME,
api_version=API_VERSION,
task_id="test_task",
)
@@ -123,7 +135,7 @@ class TestGoogleCampaignManagerGetReportOperator(TestCase):
impersonation_chain=None,
)
hook_mock.return_value.get_report_file.assert_called_once_with(
- profile_id=profile_id, report_id=report_id, file_id=file_id
+ profile_id=PROFILE_ID, report_id=REPORT_ID, file_id=FILE_ID
)
gcs_hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -131,13 +143,70 @@ class TestGoogleCampaignManagerGetReportOperator(TestCase):
impersonation_chain=None,
)
gcs_hook_mock.return_value.upload.assert_called_once_with(
- bucket_name=bucket_name,
- object_name=report_name + ".gz",
+ bucket_name=BUCKET_NAME,
+ object_name=REPORT_NAME + ".gz",
+ gzip=True,
+ filename=TEMP_FILE_NAME,
+ mime_type="text/csv",
+ )
+ xcom_mock.assert_called_once_with(None, key="report_name", value=REPORT_NAME + ".gz")
+
+ @parameterized.expand([BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ ti.xcom_pull(task_ids='f') }}"])
+ @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.http")
+ @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile")
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
+ )
+ @mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.GCSHook")
+ def test_set_bucket_name(
+ self,
+ test_bucket_name,
+ gcs_hook_mock,
+ hook_mock,
+ tempfile_mock,
+ http_mock,
+ ):
+ http_mock.MediaIoBaseDownload.return_value.next_chunk.return_value = (
+ None,
+ True,
+ )
+ tempfile_mock.NamedTemporaryFile.return_value.__enter__.return_value.name = TEMP_FILE_NAME
+
+ dag = DAG(
+ dag_id="test_set_bucket_name",
+ start_date=DEFAULT_DATE,
+ schedule_interval=None,
+ catchup=False,
+ )
+
+ if BUCKET_NAME not in test_bucket_name:
+
+ @dag.task
+ def f():
+ return BUCKET_NAME
+
+ taskflow_op = f()
+ taskflow_op.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ op = GoogleCampaignManagerDownloadReportOperator(
+ profile_id=PROFILE_ID,
+ report_id=REPORT_ID,
+ file_id=FILE_ID,
+ bucket_name=test_bucket_name if test_bucket_name != "XComArg" else taskflow_op,
+ report_name=REPORT_NAME,
+ api_version=API_VERSION,
+ task_id="test_task",
+ dag=dag,
+ )
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ gcs_hook_mock.return_value.upload.assert_called_once_with(
+ bucket_name=BUCKET_NAME,
+ object_name=REPORT_NAME + ".gz",
gzip=True,
- filename=temp_file_name,
+ filename=TEMP_FILE_NAME,
mime_type="text/csv",
)
- xcom_mock.assert_called_once_with(None, key="report_name", value=report_name + ".gz")
class TestGoogleCampaignManagerInsertReportOperator(TestCase):
@@ -150,14 +219,12 @@ class TestGoogleCampaignManagerInsertReportOperator(TestCase):
"campaign_manager.GoogleCampaignManagerInsertReportOperator.xcom_push"
)
def test_execute(self, xcom_mock, mock_base_op, hook_mock):
- profile_id = "PROFILE_ID"
report = {"report": "test"}
- report_id = "test"
- hook_mock.return_value.insert_report.return_value = {"id": report_id}
+ hook_mock.return_value.insert_report.return_value = {"id": REPORT_ID}
op = GoogleCampaignManagerInsertReportOperator(
- profile_id=profile_id,
+ profile_id=PROFILE_ID,
report=report,
api_version=API_VERSION,
task_id="test_task",
@@ -169,17 +236,16 @@ class TestGoogleCampaignManagerInsertReportOperator(TestCase):
api_version=API_VERSION,
impersonation_chain=None,
)
- hook_mock.return_value.insert_report.assert_called_once_with(profile_id=profile_id, report=report)
- xcom_mock.assert_called_once_with(None, key="report_id", value=report_id)
+ hook_mock.return_value.insert_report.assert_called_once_with(profile_id=PROFILE_ID, report=report)
+ xcom_mock.assert_called_once_with(None, key="report_id", value=REPORT_ID)
def test_prepare_template(self):
- profile_id = "PROFILE_ID"
report = {"key": "value"}
with NamedTemporaryFile("w+", suffix=".json") as f:
f.write(json.dumps(report))
f.flush()
op = GoogleCampaignManagerInsertReportOperator(
- profile_id=profile_id,
+ profile_id=PROFILE_ID,
report=f.name,
api_version=API_VERSION,
task_id="test_task",
@@ -200,16 +266,13 @@ class TestGoogleCampaignManagerRunReportOperator(TestCase):
"campaign_manager.GoogleCampaignManagerRunReportOperator.xcom_push"
)
def test_execute(self, xcom_mock, mock_base_op, hook_mock):
- profile_id = "PROFILE_ID"
- report_id = "REPORT_ID"
- file_id = "FILE_ID"
synchronous = True
- hook_mock.return_value.run_report.return_value = {"id": file_id}
+ hook_mock.return_value.run_report.return_value = {"id": FILE_ID}
op = GoogleCampaignManagerRunReportOperator(
- profile_id=profile_id,
- report_id=report_id,
+ profile_id=PROFILE_ID,
+ report_id=REPORT_ID,
synchronous=synchronous,
api_version=API_VERSION,
task_id="test_task",
@@ -222,9 +285,9 @@ class TestGoogleCampaignManagerRunReportOperator(TestCase):
impersonation_chain=None,
)
hook_mock.return_value.run_report.assert_called_once_with(
- profile_id=profile_id, report_id=report_id, synchronous=synchronous
+ profile_id=PROFILE_ID, report_id=REPORT_ID, synchronous=synchronous
)
- xcom_mock.assert_called_once_with(None, key="file_id", value=file_id)
+ xcom_mock.assert_called_once_with(None, key="file_id", value=FILE_ID)
class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
@@ -233,10 +296,9 @@ class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
)
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
def test_execute(self, mock_base_op, hook_mock):
- profile_id = "PROFILE_ID"
op = GoogleCampaignManagerBatchInsertConversionsOperator(
task_id="insert_conversion",
- profile_id=profile_id,
+ profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
@@ -244,7 +306,7 @@ class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
)
op.execute(None)
hook_mock.return_value.conversions_batch_insert.assert_called_once_with(
- profile_id=profile_id,
+ profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
@@ -259,10 +321,9 @@ class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase):
)
@mock.patch("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator")
def test_execute(self, mock_base_op, hook_mock):
- profile_id = "PROFILE_ID"
op = GoogleCampaignManagerBatchUpdateConversionsOperator(
task_id="update_conversion",
- profile_id=profile_id,
+ profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
@@ -270,7 +331,7 @@ class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase):
)
op.execute(None)
hook_mock.return_value.conversions_batch_update.assert_called_once_with(
- profile_id=profile_id,
+ profile_id=PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
diff --git a/tests/providers/google/marketing_platform/operators/test_display_video.py b/tests/providers/google/marketing_platform/operators/test_display_video.py
index eb00b9d..2f1842d 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video.py
@@ -20,6 +20,9 @@ from tempfile import NamedTemporaryFile
from typing import Optional
from unittest import TestCase, mock
+from parameterized import parameterized
+
+from airflow.models import DAG, TaskInstance as TI
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator,
GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
@@ -30,12 +33,21 @@ from airflow.providers.google.marketing_platform.operators.display_video import
GoogleDisplayVideo360SDFtoGCSOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
)
+from airflow.utils import timezone
+from airflow.utils.session import create_session
API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"
DELEGATE_TO: Optional[str] = None
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+REPORT_ID = "report_id"
+BUCKET_NAME = "test_bucket"
+REPORT_NAME = "test_report.csv"
+QUERY_ID = FILENAME = "test"
+OBJECT_NAME = "object_name"
+
class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
@mock.patch(
@@ -45,11 +57,9 @@ class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
)
- @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
- def test_execute(self, mock_base_op, hook_mock, xcom_mock):
+ def test_execute(self, hook_mock, xcom_mock):
body = {"body": "test"}
- query_id = "TEST"
- hook_mock.return_value.create_query.return_value = {"queryId": query_id}
+ hook_mock.return_value.create_query.return_value = {"queryId": QUERY_ID}
op = GoogleDisplayVideo360CreateReportOperator(
body=body, api_version=API_VERSION, task_id="test_task"
)
@@ -61,7 +71,7 @@ class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
impersonation_chain=None,
)
hook_mock.return_value.create_query.assert_called_once_with(query=body)
- xcom_mock.assert_called_once_with(None, key="report_id", value=query_id)
+ xcom_mock.assert_called_once_with(None, key="report_id", value=QUERY_ID)
def test_prepare_template(self):
body = {"key": "value"}
@@ -81,11 +91,9 @@ class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
)
- @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
- def test_execute(self, mock_base_op, hook_mock):
- query_id = "QUERY_ID"
+ def test_execute(self, hook_mock):
op = GoogleDisplayVideo360DeleteReportOperator(
- report_id=query_id, api_version=API_VERSION, task_id="test_task"
+ report_id=QUERY_ID, api_version=API_VERSION, task_id="test_task"
)
op.execute(context=None)
hook_mock.assert_called_once_with(
@@ -94,10 +102,18 @@ class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
api_version=API_VERSION,
impersonation_chain=None,
)
- hook_mock.return_value.delete_query.assert_called_once_with(query_id=query_id)
+ hook_mock.return_value.delete_query.assert_called_once_with(query_id=QUERY_ID)
+
+class TestGoogleDisplayVideo360DownloadReportOperator(TestCase):
+ def setUp(self):
+ with create_session() as session:
+ session.query(TI).delete()
+
+ def tearDown(self):
+ with create_session() as session:
+ session.query(TI).delete()
-class TestGoogleDisplayVideo360GetReportOperator(TestCase):
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
@@ -109,10 +125,8 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
)
- @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
def test_execute(
self,
- mock_base_op,
mock_hook,
mock_gcs_hook,
mock_xcom,
@@ -120,11 +134,7 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
mock_request,
mock_shutil,
):
- report_id = "REPORT_ID"
- bucket_name = "BUCKET"
- report_name = "TEST.csv"
- filename = "test"
- mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
mock_hook.return_value.get_query.return_value = {
"metadata": {
"running": False,
@@ -132,10 +142,10 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
}
}
op = GoogleDisplayVideo360DownloadReportOperator(
- report_id=report_id,
+ report_id=REPORT_ID,
api_version=API_VERSION,
- bucket_name=bucket_name,
- report_name=report_name,
+ bucket_name=BUCKET_NAME,
+ report_name=REPORT_NAME,
task_id="test_task",
)
op.execute(context=None)
@@ -145,7 +155,7 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
api_version=API_VERSION,
impersonation_chain=None,
)
- mock_hook.return_value.get_query.assert_called_once_with(query_id=report_id)
+ mock_hook.return_value.get_query.assert_called_once_with(query_id=REPORT_ID)
mock_gcs_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -153,25 +163,82 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
impersonation_chain=None,
)
mock_gcs_hook.return_value.upload.assert_called_once_with(
- bucket_name=bucket_name,
- filename=filename,
+ bucket_name=BUCKET_NAME,
+ filename=FILENAME,
+ gzip=True,
+ mime_type="text/csv",
+ object_name=REPORT_NAME + ".gz",
+ )
+ mock_xcom.assert_called_once_with(None, key="report_name", value=REPORT_NAME + ".gz")
+
+ @parameterized.expand([BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ ti.xcom_pull(task_ids='f') }}"])
+ @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
+ @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
+ @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
+ @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+ )
+ def test_set_bucket_name(
+ self,
+ test_bucket_name,
+ mock_hook,
+ mock_gcs_hook,
+ mock_temp,
+ mock_request,
+ mock_shutil,
+ ):
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
+ mock_hook.return_value.get_query.return_value = {
+ "metadata": {
+ "running": False,
+ "googleCloudStoragePathForLatestReport": "test",
+ }
+ }
+
+ dag = DAG(
+ dag_id="test_set_bucket_name",
+ start_date=DEFAULT_DATE,
+ schedule_interval=None,
+ catchup=False,
+ )
+
+ if BUCKET_NAME not in test_bucket_name:
+
+ @dag.task
+ def f():
+ return BUCKET_NAME
+
+ taskflow_op = f()
+ taskflow_op.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ op = GoogleDisplayVideo360DownloadReportOperator(
+ report_id=REPORT_ID,
+ api_version=API_VERSION,
+ bucket_name=test_bucket_name if test_bucket_name != "XComArg" else taskflow_op,
+ report_name=REPORT_NAME,
+ task_id="test_task",
+ dag=dag,
+ )
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ mock_gcs_hook.return_value.upload.assert_called_once_with(
+ bucket_name=BUCKET_NAME,
+ filename=FILENAME,
gzip=True,
mime_type="text/csv",
- object_name=report_name + ".gz",
+ object_name=REPORT_NAME + ".gz",
)
- mock_xcom.assert_called_once_with(None, key="report_name", value=report_name + ".gz")
class TestGoogleDisplayVideo360RunReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
)
- @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.BaseOperator")
- def test_execute(self, mock_base_op, hook_mock):
- report_id = "QUERY_ID"
+ def test_execute(self, hook_mock):
parameters = {"param": "test"}
op = GoogleDisplayVideo360RunReportOperator(
- report_id=report_id,
+ report_id=REPORT_ID,
parameters=parameters,
api_version=API_VERSION,
task_id="test_task",
@@ -183,7 +250,7 @@ class TestGoogleDisplayVideo360RunReportOperator(TestCase):
api_version=API_VERSION,
impersonation_chain=None,
)
- hook_mock.return_value.run_query.assert_called_once_with(query_id=report_id, params=parameters)
+ hook_mock.return_value.run_query.assert_called_once_with(query_id=REPORT_ID, params=parameters)
class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
@@ -199,16 +266,13 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
"format": "format",
"fileSpec": "file_spec",
}
- bucket_name = "bucket_name"
- object_name = "object_name"
- filename = "test"
- mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
gzip = False
op = GoogleDisplayVideo360DownloadLineItemsOperator(
request_body=request_body,
- bucket_name=bucket_name,
- object_name=object_name,
+ bucket_name=BUCKET_NAME,
+ object_name=OBJECT_NAME,
gzip=gzip,
api_version=API_VERSION,
gcp_conn_id=GCP_CONN_ID,
@@ -220,9 +284,9 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
op.execute(context=None)
gcs_hook_mock.return_value.upload.assert_called_with(
- bucket_name=bucket_name,
- object_name=object_name,
- filename=filename,
+ bucket_name=BUCKET_NAME,
+ object_name=OBJECT_NAME,
+ filename=FILENAME,
gzip=gzip,
mime_type="text/csv",
)
@@ -248,16 +312,13 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
)
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
def test_execute(self, gcs_hook_mock, hook_mock, mock_tempfile):
- filename = "filename"
- object_name = "object_name"
- bucket_name = "bucket_name"
line_items = "holy_hand_grenade"
gcs_hook_mock.return_value.download.return_value = line_items
- mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+ mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
op = GoogleDisplayVideo360UploadLineItemsOperator(
- bucket_name=bucket_name,
- object_name=object_name,
+ bucket_name=BUCKET_NAME,
+ object_name=OBJECT_NAME,
api_version=API_VERSION,
gcp_conn_id=GCP_CONN_ID,
task_id="test_task",
@@ -277,9 +338,9 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
)
gcs_hook_mock.return_value.download.assert_called_once_with(
- bucket_name=bucket_name,
- object_name=object_name,
- filename=filename,
+ bucket_name=BUCKET_NAME,
+ object_name=OBJECT_NAME,
+ filename=FILENAME,
)
hook_mock.return_value.upload_line_items.assert_called_once()
hook_mock.return_value.upload_line_items.assert_called_once_with(line_items=line_items)
@@ -294,19 +355,16 @@ class TestGoogleDisplayVideo360SDFtoGCSOperator(TestCase):
def test_execute(self, mock_temp, gcs_mock_hook, mock_hook):
operation_name = "operation_name"
operation = {"key": "value"}
- bucket_name = "bucket_name"
- object_name = "object_name"
- filename = "filename"
gzip = False
# mock_hook.return_value.create_sdf_download_operation.return_value = response_name
mock_hook.return_value.get_sdf_download_operation.return_value = operation
- mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
op = GoogleDisplayVideo360SDFtoGCSOperator(
operation_name=operation_name,
- bucket_name=bucket_name,
- object_name=object_name,
+ bucket_name=BUCKET_NAME,
+ object_name=OBJECT_NAME,
gzip=gzip,
api_version=API_VERSION,
gcp_conn_id=GCP_CONN_ID,
@@ -348,9 +406,9 @@ class TestGoogleDisplayVideo360SDFtoGCSOperator(TestCase):
gcs_mock_hook.return_value.upload.assert_called_once()
gcs_mock_hook.return_value.upload.assert_called_once_with(
- bucket_name=bucket_name,
- object_name=object_name,
- filename=filename,
+ bucket_name=BUCKET_NAME,
+ object_name=OBJECT_NAME,
+ filename=FILENAME,
gzip=gzip,
)
diff --git a/tests/providers/google/marketing_platform/operators/test_search_ads.py b/tests/providers/google/marketing_platform/operators/test_search_ads.py
index cc2ed7d..068d2b0 100644
--- a/tests/providers/google/marketing_platform/operators/test_search_ads.py
+++ b/tests/providers/google/marketing_platform/operators/test_search_ads.py
@@ -19,14 +19,26 @@ import json
from tempfile import NamedTemporaryFile
from unittest import TestCase, mock
+from parameterized import parameterized
+
+from airflow.models import DAG, TaskInstance as TI
from airflow.providers.google.marketing_platform.operators.search_ads import (
GoogleSearchAdsDownloadReportOperator,
GoogleSearchAdsInsertReportOperator,
)
+from airflow.utils import timezone
+from airflow.utils.session import create_session
API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+END_DATE = timezone.datetime(2021, 1, 2)
+REPORT_ID = "report_id"
+BUCKET_NAME = "test_bucket"
+REPORT_NAME = "test_report.csv"
+FILE_NAME = "test"
+
class TestGoogleSearchAdsInsertReportOperator(TestCase):
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
@@ -37,8 +49,7 @@ class TestGoogleSearchAdsInsertReportOperator(TestCase):
)
def test_execute(self, xcom_mock, mock_base_op, hook_mock):
report = {"report": "test"}
- report_id = "TEST"
- hook_mock.return_value.insert_report.return_value = {"id": report_id}
+ hook_mock.return_value.insert_report.return_value = {"id": REPORT_ID}
op = GoogleSearchAdsInsertReportOperator(report=report, api_version=API_VERSION, task_id="test_task")
op.execute(context=None)
hook_mock.assert_called_once_with(
@@ -48,7 +59,7 @@ class TestGoogleSearchAdsInsertReportOperator(TestCase):
impersonation_chain=None,
)
hook_mock.return_value.insert_report.assert_called_once_with(report=report)
- xcom_mock.assert_called_once_with(None, key="report_id", value=report_id)
+ xcom_mock.assert_called_once_with(None, key="report_id", value=REPORT_ID)
def test_prepare_template(self):
report = {"key": "value"}
@@ -65,19 +76,23 @@ class TestGoogleSearchAdsInsertReportOperator(TestCase):
class TestGoogleSearchAdsDownloadReportOperator(TestCase):
+ def setUp(self):
+ with create_session() as session:
+ session.query(TI).delete()
+
+ def tearDown(self):
+ with create_session() as session:
+ session.query(TI).delete()
+
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
- @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
@mock.patch(
"airflow.providers.google.marketing_platform."
"operators.search_ads.GoogleSearchAdsDownloadReportOperator.xcom_push"
)
- def test_execute(self, xcom_mock, mock_base_op, hook_mock, gcs_hook_mock, tempfile_mock):
- report_id = "REPORT_ID"
- file_name = "TEST"
+ def test_execute(self, xcom_mock, hook_mock, gcs_hook_mock, tempfile_mock):
temp_file_name = "TEMP"
- bucket_name = "test"
data = b"data"
hook_mock.return_value.get.return_value = {"files": [0], "isReportReady": True}
@@ -85,9 +100,9 @@ class TestGoogleSearchAdsDownloadReportOperator(TestCase):
tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
op = GoogleSearchAdsDownloadReportOperator(
- report_id=report_id,
- report_name=file_name,
- bucket_name=bucket_name,
+ report_id=REPORT_ID,
+ report_name=FILE_NAME,
+ bucket_name=BUCKET_NAME,
api_version=API_VERSION,
task_id="test_task",
)
@@ -98,12 +113,57 @@ class TestGoogleSearchAdsDownloadReportOperator(TestCase):
api_version=API_VERSION,
impersonation_chain=None,
)
- hook_mock.return_value.get_file.assert_called_once_with(report_fragment=0, report_id=report_id)
+ hook_mock.return_value.get_file.assert_called_once_with(report_fragment=0, report_id=REPORT_ID)
tempfile_mock.return_value.__enter__.return_value.write.assert_called_once_with(data)
gcs_hook_mock.return_value.upload.assert_called_once_with(
- bucket_name=bucket_name,
+ bucket_name=BUCKET_NAME,
+ gzip=True,
+ object_name=FILE_NAME + ".csv.gz",
+ filename=temp_file_name,
+ )
+ xcom_mock.assert_called_once_with(None, key="file_name", value=FILE_NAME + ".csv.gz")
+
+ @parameterized.expand([BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ ti.xcom_pull(task_ids='f') }}"])
+ @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
+ @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
+ @mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
+ def test_set_bucket_name(self, test_bucket_name, hook_mock, gcs_hook_mock, tempfile_mock):
+ temp_file_name = "TEMP"
+ data = b"data"
+
+ hook_mock.return_value.get.return_value = {"files": [0], "isReportReady": True}
+ hook_mock.return_value.get_file.return_value = data
+ tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
+
+ dag = DAG(
+ dag_id="test_set_bucket_name",
+ start_date=DEFAULT_DATE,
+ schedule_interval=None,
+ catchup=False,
+ )
+
+ if BUCKET_NAME not in test_bucket_name:
+
+ @dag.task
+ def f():
+ return BUCKET_NAME
+
+ taskflow_op = f()
+ taskflow_op.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ op = GoogleSearchAdsDownloadReportOperator(
+ report_id=REPORT_ID,
+ report_name=FILE_NAME,
+ bucket_name=test_bucket_name if test_bucket_name != "XComArg" else taskflow_op,
+ api_version=API_VERSION,
+ task_id="test_task",
+ dag=dag,
+ )
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ gcs_hook_mock.return_value.upload.assert_called_once_with(
+ bucket_name=BUCKET_NAME,
gzip=True,
- object_name=file_name + ".csv.gz",
+ object_name=FILE_NAME + ".csv.gz",
filename=temp_file_name,
)
- xcom_mock.assert_called_once_with(None, key="file_name", value=file_name + ".csv.gz")