You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/08 08:21:22 UTC
[airflow] branch master updated: Added SDFtoGCSOperator (#8740)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 58aefb2 Added SDFtoGCSOperator (#8740)
58aefb2 is described below
commit 58aefb23b1d456bbb24876a4e3ff14f25d6274b0
Author: Michał Słowikowski <mi...@gmail.com>
AuthorDate: Fri May 8 10:20:44 2020 +0200
Added SDFtoGCSOperator (#8740)
Co-authored-by: michalslowikowski00 <mi...@polidea.com>
---
.../providers/google/common/hooks/base_google.py | 22 ++-
.../example_dags/example_display_video.py | 74 ++++++++--
.../marketing_platform/hooks/display_video.py | 67 ++++++++-
.../marketing_platform/operators/display_video.py | 163 ++++++++++++++++++++-
.../marketing_platform/sensors/display_video.py | 59 +++++++-
docs/howto/operator/gcp/display_video.rst | 55 +++++++
.../marketing_platform/hooks/test_display_video.py | 151 ++++++++++++++++++-
.../operators/test_display_video.py | 141 +++++++++++++++---
.../operators/test_display_video_system.py | 11 +-
.../sensors/test_display_video.py | 25 +++-
10 files changed, 718 insertions(+), 50 deletions(-)
diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py
index a92d349..4d3418a 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -39,7 +39,7 @@ from google.api_core.gapic_v1.client_info import ClientInfo
from google.auth import _cloud_sdk
from google.auth.environment_vars import CREDENTIALS
from googleapiclient.errors import HttpError
-from googleapiclient.http import set_user_agent
+from googleapiclient.http import MediaIoBaseDownload, set_user_agent
from airflow import version
from airflow.exceptions import AirflowException
@@ -456,3 +456,23 @@ class GoogleBaseHook(BaseHook):
creds_content["refresh_token"],
])
yield
+
+ @staticmethod
+ def download_content_from_request(file_handle, request, chunk_size):
+ """
+ Download media resources.
+ Note that the Python file object is compatible with io.Base and can be used with this class also.
+
+ :param file_handle: io.Base or file object. The stream in which to write the downloaded
+ bytes.
+ :type file_handle: io.Base or file object
+ :param request: googleapiclient.http.HttpRequest, the media request to perform in chunks.
+ :type request: Dict
+ :param chunk_size: int, File will be downloaded in chunks of this many bytes.
+ :type chunk_size: int
+ """
+ downloader = MediaIoBaseDownload(file_handle, request, chunksize=chunk_size)
+ done = False
+ while done is False:
+ _, done = downloader.next_chunk()
+ file_handle.flush()
diff --git a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
index 6373972..a46cf2e 100644
--- a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+++ b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
@@ -19,15 +19,18 @@
Example Airflow DAG that shows how to use DisplayVideo.
"""
import os
+from typing import Dict
from airflow import models
+from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.marketing_platform.operators.display_video import (
- GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
- GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
- GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
+ GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
+ GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
+ GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
+ GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
- GoogleDisplayVideo360ReportSensor,
+ GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
)
from airflow.utils import dates
@@ -35,6 +38,15 @@ from airflow.utils import dates
BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
+PATH_TO_UPLOAD_FILE = os.environ.get(
+ "GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
+)
+PATH_TO_SAVED_FILE = os.environ.get(
+ "GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
+)
+BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
+SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
+BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
REPORT = {
"kind": "doubleclickbidmanager#query",
@@ -55,14 +67,16 @@ REPORT = {
}
PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
+
+BODY_REQUEST: Dict = {
+ "version": SDF_VERSION,
+ "advertiserId": ADVERTISER_ID,
+ "inventorySourceFilter": {"inventorySourceIds": []},
+}
# [END howto_display_video_env_variables]
# download_line_items variables
-REQUEST_BODY = {
- "filterType": ADVERTISER_ID,
- "format": "CSV",
- "fileSpec": "EWF"
-}
+REQUEST_BODY = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"}
default_args = {"start_date": dates.days_ago(1)}
@@ -119,7 +133,47 @@ with models.DAG(
upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator(
task_id="upload_line_items",
bucket_name=BUCKET,
- object_name=OBJECT_NAME,
+ object_name=BUCKET_FILE_LOCATION,
)
# [END howto_google_display_video_upload_line_items_operator]
+
+ # [START howto_google_display_video_create_sdf_download_task_operator]
+ create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
+ task_id="create_sdf_download_task", body_request=BODY_REQUEST
+ )
+ operation_name = '{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'
+ # [END howto_google_display_video_create_sdf_download_task_operator]
+
+ # [START howto_google_display_video_wait_for_operation_sensor]
+ wait_for_operation = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
+ task_id="wait_for_operation", operation_name=operation_name,
+ )
+ # [END howto_google_display_video_wait_for_operation_sensor]
+
+ # [START howto_google_display_video_save_sdf_in_gcs_operator]
+ save_sdf_in_gcs = GoogleDisplayVideo360SDFtoGCSOperator(
+ task_id="save_sdf_in_gcs",
+ operation_name=operation_name,
+ bucket_name=BUCKET,
+ object_name=BUCKET_FILE_LOCATION,
+ gzip=False,
+ )
+ # [END howto_google_display_video_save_sdf_in_gcs_operator]
+
+ # [START howto_google_display_video_gcs_to_big_query_operator]
+ upload_sdf_to_big_query = GCSToBigQueryOperator(
+ task_id="upload_sdf_to_big_query",
+ bucket=BUCKET,
+ source_objects=['{{ task_instance.xcom_pull("upload_sdf_to_bigquery")}}'],
+ destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table",
+ schema_fields=[
+ {"name": "name", "type": "STRING", "mode": "NULLABLE"},
+ {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
+ ],
+ write_disposition="WRITE_TRUNCATE",
+ dag=dag,
+ )
+ # [END howto_google_display_video_gcs_to_big_query_operator]
+
create_report >> run_report >> wait_for_report >> get_report >> delete_report
+ create_sdf_download_task >> wait_for_operation >> save_sdf_in_gcs >> upload_sdf_to_big_query
diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py b/airflow/providers/google/marketing_platform/hooks/display_video.py
index e7bb797..9985dd0 100644
--- a/airflow/providers/google/marketing_platform/hooks/display_video.py
+++ b/airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -56,6 +56,20 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
)
return self._conn
+ def get_conn_to_display_video(self) -> Resource:
+ """
+ Retrieves connection to DisplayVideo.
+ """
+ if not self._conn:
+ http_authorized = self._authorize()
+ self._conn = build(
+ "displayvideo",
+ self.api_version,
+ http=http_authorized,
+ cache_discovery=False,
+ )
+ return self._conn
+
def create_query(self, query: Dict[str, Any]) -> Dict:
"""
Creates a query.
@@ -111,7 +125,7 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
.listqueries()
.execute(num_retries=self.num_retries)
)
- return response.get('queries', [])
+ return response.get("queries", [])
def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
"""
@@ -170,3 +184,54 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
.execute(num_retries=self.num_retries)
)
return response["lineItems"]
+
+ def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Creates an SDF Download Task and Returns an Operation.
+
+ :param body_request: Body request.
+ :type body_request: Dict[str, Any]
+
+ More information about body request n be found here:
+ https://developers.google.com/display-video/api/reference/rest/v1/sdfdownloadtasks/create
+ """
+
+ result = (
+ self.get_conn_to_display_video() # pylint: disable=no-member
+ .sdfdownloadtasks()
+ .create(body=body_request)
+ .execute(num_retries=self.num_retries)
+ )
+ return result
+
+ def get_sdf_download_operation(self, operation_name: str):
+ """
+ Gets the latest state of an asynchronous SDF download task operation.
+
+ :param operation_name: The name of the operation resource.
+ :type operation_name: str
+ """
+
+ result = (
+ self.get_conn_to_display_video() # pylint: disable=no-member
+ .sdfdownloadtasks()
+ .operation()
+ .get(name=operation_name)
+ .execute(num_retries=self.num_retries)
+ )
+ return result
+
+ def download_media(self, resource_name: str):
+ """
+ Downloads media.
+
+ :param resource_name: of the media that is being downloaded.
+ :type resource_name: str
+ """
+
+ request = (
+ self.get_conn_to_display_video() # pylint: disable=no-member
+ .media()
+ .download_media(resource_name=resource_name)
+ )
+ return request
diff --git a/airflow/providers/google/marketing_platform/operators/display_video.py b/airflow/providers/google/marketing_platform/operators/display_video.py
index f4c70c5..993f93d 100644
--- a/airflow/providers/google/marketing_platform/operators/display_video.py
+++ b/airflow/providers/google/marketing_platform/operators/display_video.py
@@ -67,7 +67,7 @@ class GoogleDisplayVideo360CreateReportOperator(BaseOperator):
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
- **kwargs
+ **kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.body = body
@@ -125,7 +125,7 @@ class GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
- **kwargs
+ **kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.report_id = report_id
@@ -209,7 +209,7 @@ class GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
- **kwargs
+ **kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.report_id = report_id
@@ -312,7 +312,7 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
- **kwargs
+ **kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.report_id = report_id
@@ -440,7 +440,7 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
- **kwargs
+ **kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.bucket_name = bucket_name
@@ -450,9 +450,7 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context: Dict):
- gcs_hook = GCSHook(
- gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
- )
+ gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -470,3 +468,152 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
)
f.flush()
hook.upload_line_items(line_items=line_items)
+
+
+class GoogleDisplayVideo360CreateSDFDownloadTaskOperator(BaseOperator):
+ """
+ Creates SDF operation task.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:GoogleDisplayVideo360CreateSDFDownloadTaskOperator`
+
+ .. seealso::
+ Check also the official API docs:
+ `https://developers.google.com/display-video/api/reference/rest`
+
+ :param version: The SDF version of the downloaded file..
+ :type version: str
+ :param partner_id: The ID of the partner to download SDF for.
+ :type partner_id: str
+ :param advertiser_id: The ID of the advertiser to download SDF for.
+ :type advertiser_id: str
+ :param parent_entity_filter: Filters on selected file types.
+ :type parent_entity_filter: Dict[str, Any]
+ :param id_filter: Filters on entities by their entity IDs.
+ :type id_filter: Dict[str, Any]
+ :param inventory_source_filter: Filters on Inventory Sources by their IDs.
+ :type inventory_source_filter: Dict[str, Any]
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :type gcp_conn_id: str
+ :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+ request must have domain-wide delegation enabled.
+ :type delegate_to: str
+ """
+
+ template_fields = ("body_request", )
+
+ @apply_defaults
+ def __init__(
+ self,
+ body_request: Dict[str, Any],
+ api_version: str = "v1",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.body_request = body_request
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context: Dict):
+ hook = GoogleDisplayVideo360Hook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ )
+
+ self.log.info("Creating operation for SDF download task...")
+ operation = hook.create_sdf_download_operation(
+ body_request=self.body_request
+ )
+
+ return operation
+
+
+class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator):
+ """
+ Download SDF media and save it in the Google Cloud Storage.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:GoogleDisplayVideo360SDFtoGCSOperator`
+
+ .. seealso::
+ Check also the official API docs:
+ `https://developers.google.com/display-video/api/reference/rest`
+
+ :param version: The SDF version of the downloaded file..
+ :type version: str
+ :param partner_id: The ID of the partner to download SDF for.
+ :type partner_id: str
+ :param advertiser_id: The ID of the advertiser to download SDF for.
+ :type advertiser_id: str
+ :param parent_entity_filter: Filters on selected file types.
+ :type parent_entity_filter: Dict[str, Any]
+ :param id_filter: Filters on entities by their entity IDs.
+ :type id_filter: Dict[str, Any]
+ :param inventory_source_filter: Filters on Inventory Sources by their IDs.
+ :type inventory_source_filter: Dict[str, Any]
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :type gcp_conn_id: str
+ :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+ request must have domain-wide delegation enabled.
+ :type delegate_to: str
+ """
+
+ template_fields = ("operation_name", "bucket_name", "object_name", "body_request")
+
+ @apply_defaults
+ def __init__(
+ self,
+ operation_name: str,
+ bucket_name: str,
+ object_name: str,
+ gzip: bool = False,
+ api_version: str = "v1",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.operation_name = operation_name
+ self.bucket_name = bucket_name
+ self.object_name = object_name
+ self.gzip = gzip
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context: Dict):
+ hook = GoogleDisplayVideo360Hook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ )
+ gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
+
+ self.log.info("Retrieving operation...")
+ operation = hook.get_sdf_download_operation(operation_name=self.operation_name)
+
+ self.log.info("Creating file for upload...")
+ media = hook.download_media(resource_name=operation)
+
+ self.log.info("Sending file to the Google Cloud Storage...")
+ with tempfile.NamedTemporaryFile() as temp_file:
+ hook.download_content_from_request(
+ temp_file, media, chunk_size=1024 * 1024
+ )
+ temp_file.flush()
+ gcs_hook.upload(
+ bucket_name=self.bucket_name,
+ object_name=self.object_name,
+ filename=temp_file.name,
+ gzip=self.gzip,
+ )
+
+ return f"{self.bucket_name}/{self.object_name}"
diff --git a/airflow/providers/google/marketing_platform/sensors/display_video.py b/airflow/providers/google/marketing_platform/sensors/display_video.py
index 4aafff3..7306c69 100644
--- a/airflow/providers/google/marketing_platform/sensors/display_video.py
+++ b/airflow/providers/google/marketing_platform/sensors/display_video.py
@@ -20,6 +20,7 @@ Sensor for detecting the completion of DV360 reports.
"""
from typing import Dict, Optional
+from airflow import AirflowException
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
@@ -38,7 +39,7 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator):
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
- :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
+ :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
request must have domain-wide delegation enabled.
:type delegate_to: str
"""
@@ -72,3 +73,59 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator):
if response and not response.get("metadata", {}).get("running"):
return True
return False
+
+
+class GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
+ """
+ Sensor for detecting the completion of SDF operation.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:GoogleDisplayVideo360GetSDFDownloadOperationSensor`
+
+ :param name: The name of the operation resource
+ :type name: Dict[str, Any]
+ :param api_version: The version of the api that will be requested for example 'v1'.
+ :type api_version: str
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :type gcp_conn_id: str
+ :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+ request must have domain-wide delegation enabled.
+ :type delegate_to: str
+
+ """
+
+ template_fields = ("operation_name", )
+
+ def __init__(
+ self,
+ operation_name: str,
+ api_version: str = "v1",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ mode: str = "reschedule",
+ poke_interval: int = 60 * 5,
+ *args,
+ **kwargs
+ ):
+ super().__init__(*args, **kwargs)
+ self.mode = mode
+ self.poke_interval = poke_interval
+ self.operation_name = operation_name
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def poke(self, context: Dict) -> bool:
+ hook = GoogleDisplayVideo360Hook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ )
+ operation = hook.get_sdf_download_operation(operation_name=self.operation_name)
+
+ if "error" in operation:
+ raise AirflowException(f'The operation finished in error with {operation["error"]}')
+ if operation and operation.get("done"):
+ return True
+ return False
diff --git a/docs/howto/operator/gcp/display_video.rst b/docs/howto/operator/gcp/display_video.rst
index 6890f81..04f4814 100644
--- a/docs/howto/operator/gcp/display_video.rst
+++ b/docs/howto/operator/gcp/display_video.rst
@@ -168,3 +168,58 @@ To run Display&Video 360 uploading line items use
Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360UploadLineItemsOperator`
parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleDisplayVideo360CreateSDFDownloadTaskOperator:
+
+Create SDF download task
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+To create SDF download task use
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_google_display_video_create_sdf_download_task_operator]
+ :end-before: [END howto_google_display_video_create_sdf_download_task_operator]
+
+Use :ref:`Jinja templating <jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator`
+parameters which allow you to dynamically determine values.
+
+
+.. _howto/operator:GoogleDisplayVideo360SDFtoGCSOperator:
+
+Save SDF files in the Google Cloud Storage
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To save SDF files and save them in the Google Cloud Storage use
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360SDFtoGCSOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_google_display_video_save_sdf_in_gcs_operator]
+ :end-before: [END howto_google_display_video_save_sdf_in_gcs_operator]
+
+Use :ref:`Jinja templating <jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360SDFtoGCSOperator`
+parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleDisplayVideo360GetSDFDownloadOperationSensor:
+
+Waiting for SDF operation
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Wait for SDF operation is executed by:
+:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360GetSDFDownloadOperationSensor`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_google_display_video_wait_for_operation_sensor]
+ :end-before: [END howto_google_display_video_wait_for_operation_sensor]
+
+Use :ref:`Jinja templating <jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360GetSDFDownloadOperationSensor`
+parameters which allow you to dynamically determine values.
diff --git a/tests/providers/google/marketing_platform/hooks/test_display_video.py b/tests/providers/google/marketing_platform/hooks/test_display_video.py
index e4bba9d..f61f118 100644
--- a/tests/providers/google/marketing_platform/hooks/test_display_video.py
+++ b/tests/providers/google/marketing_platform/hooks/test_display_video.py
@@ -36,8 +36,9 @@ class TestGoogleDisplayVideo360Hook(TestCase):
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook._authorize"
)
- @mock.patch("airflow.providers.google.marketing_platform.hooks."
- "display_video.build")
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks." "display_video.build"
+ )
def test_gen_conn(self, mock_build, mock_authorize):
result = self.hook.get_conn()
mock_build.assert_called_once_with(
@@ -50,6 +51,23 @@ class TestGoogleDisplayVideo360Hook(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook._authorize"
+ )
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks." "display_video.build"
+ )
+ def test_get_conn_to_display_video(self, mock_build, mock_authorize):
+ result = self.hook.get_conn_to_display_video()
+ mock_build.assert_called_once_with(
+ "displayvideo",
+ API_VERSION,
+ http=mock_authorize.return_value,
+ cache_discovery=False,
+ )
+ self.assertEqual(mock_build.return_value, result)
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_create_query(self, get_conn_mock):
@@ -239,3 +257,132 @@ class TestGoogleDisplayVideo360Hook(TestCase):
result = self.hook.upload_line_items(line_items)
self.assertEqual(return_value, result)
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def test_create_sdf_download_tasks_called_with_params(
+ self, get_conn_to_display_video
+ ):
+ body_request = {
+ "version": "version",
+ "partnerId": "partner_id",
+ "advertiserId": "advertiser_id",
+ "parentEntityFilter": "parent_entity_filter",
+ "idFilter": "id_filter",
+ "inventorySourceFilter": "inventory_source_filter",
+ }
+
+ self.hook.create_sdf_download_operation(body_request=body_request)
+
+ get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once_with(
+ body=body_request
+ )
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def test_create_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+ body_request = {
+ "version": "version",
+ "partnerId": "partner_id",
+ "advertiserId": "advertiser_id",
+ "parentEntityFilter": "parent_entity_filter",
+ "idFilter": "id_filter",
+ "inventorySourceFilter": "inventory_source_filter",
+ }
+
+ self.hook.create_sdf_download_operation(body_request=body_request)
+
+ get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once()
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def test_create_sdf_download_tasks_return_equal_values(
+ self, get_conn_to_display_video
+ ):
+ response = ["name"]
+ body_request = {
+ "version": "version",
+ "partnerId": "partner_id",
+ "advertiserId": "advertiser_id",
+ "parentEntityFilter": "parent_entity_filter",
+ "idFilter": "id_filter",
+ "inventorySourceFilter": "inventory_source_filter",
+ }
+
+ get_conn_to_display_video.return_value.\
+ sdfdownloadtasks.return_value.\
+ create.return_value\
+ .execute.return_value = response
+
+ result = self.hook.create_sdf_download_operation(body_request=body_request)
+ self.assertEqual(response, result)
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def test_get_sdf_download_tasks_called_with_params(self, get_conn_to_display_video):
+ operation_name = "operation_name"
+ self.hook.get_sdf_download_operation(operation_name=operation_name)
+ get_conn_to_display_video.return_value.\
+ sdfdownloadtasks.return_value.\
+ operation.return_value.\
+ get.assert_called_once_with(name=operation_name)
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def test_get_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+ operation_name = "name"
+ self.hook.get_sdf_download_operation(operation_name=operation_name)
+ get_conn_to_display_video.return_value.\
+ sdfdownloadtasks.return_value.\
+ operation.return_value.\
+ get.assert_called_once()
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def get_sdf_download_tasks_return_equal_values(self, get_conn_to_display_video):
+ operation_name = "operation"
+ response = "reposonse"
+
+ get_conn_to_display_video.return_value.\
+ sdfdownloadtasks.return_value.\
+ operation.return_value.get = response
+
+ result = self.hook.get_sdf_download_operation(operation_name=operation_name)
+
+ self.assertEqual(operation_name, result)
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def test_download_media_called_once(self, get_conn_to_display_video):
+ resource_name = "resource_name"
+
+ self.hook.download_media(resource_name=resource_name)
+ get_conn_to_display_video.return_value.\
+ media.return_value.\
+ download_media.assert_called_once()
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+ )
+ def test_download_media_called_once_with_params(self, get_conn_to_display_video):
+ resource_name = "resource_name"
+
+ self.hook.download_media(resource_name=resource_name)
+ get_conn_to_display_video.return_value.\
+ media.return_value.\
+ download_media.assert_called_once_with(resource_name=resource_name)
diff --git a/tests/providers/google/marketing_platform/operators/test_display_video.py b/tests/providers/google/marketing_platform/operators/test_display_video.py
index 45fe897..dcff1cb 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video.py
@@ -19,9 +19,10 @@
from unittest import TestCase, mock
from airflow.providers.google.marketing_platform.operators.display_video import (
- GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
- GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
- GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
+ GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
+ GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
+ GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
+ GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
)
API_VERSION = "api_version"
@@ -80,8 +81,7 @@ class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
class TestGoogleDisplayVideo360GetReportOperator(TestCase):
@mock.patch(
- "airflow.providers.google.marketing_platform.operators."
- "display_video.shutil"
+ "airflow.providers.google.marketing_platform.operators." "display_video.shutil"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
@@ -96,8 +96,7 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
"display_video.GoogleDisplayVideo360DownloadReportOperator.xcom_push"
)
@mock.patch(
- "airflow.providers.google.marketing_platform.operators."
- "display_video.GCSHook"
+ "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
@@ -189,8 +188,7 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
- "airflow.providers.google.marketing_platform.operators."
- "display_video.GCSHook"
+ "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
@@ -201,7 +199,7 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
"filterType": "filter_type",
"filterIds": [],
"format": "format",
- "fileSpec": "file_spec"
+ "fileSpec": "file_spec",
}
bucket_name = "bucket_name"
object_name = "object_name"
@@ -227,12 +225,11 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
object_name=object_name,
filename=filename,
gzip=gzip,
- mime_type='text/csv',
+ mime_type="text/csv",
)
gcs_hook_mock.assert_called_once_with(
- gcp_conn_id=GCP_CONN_ID,
- delegate_to=DELEGATE_TO,
+ gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO,
)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
@@ -252,8 +249,7 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
- "airflow.providers.google.marketing_platform.operators."
- "display_video.GCSHook"
+ "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
)
def test_execute(self, gcs_hook_mock, hook_mock, mock_tempfile):
filename = "filename"
@@ -261,7 +257,9 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
bucket_name = "bucket_name"
line_items = "holy_hand_grenade"
gcs_hook_mock.return_value.download.return_value = line_items
- mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+ mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = (
+ filename
+ )
op = GoogleDisplayVideo360UploadLineItemsOperator(
bucket_name=bucket_name,
@@ -272,20 +270,117 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
)
op.execute(context=None)
hook_mock.assert_called_once_with(
- gcp_conn_id=GCP_CONN_ID,
- api_version=API_VERSION,
- delegate_to=DELEGATE_TO
+ gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
)
gcs_hook_mock.assert_called_once_with(
- gcp_conn_id=GCP_CONN_ID,
- delegate_to=DELEGATE_TO,
+ gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO,
)
gcs_hook_mock.return_value.download.assert_called_once_with(
+ bucket_name=bucket_name, object_name=object_name, filename=filename,
+ )
+ hook_mock.return_value.upload_line_items.assert_called_once()
+ hook_mock.return_value.upload_line_items.assert_called_once_with(
+ line_items=line_items
+ )
+
+
+class TestGoogleDisplayVideo360SDFtoGCSOperator(TestCase):
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators."
+ "display_video.GoogleDisplayVideo360Hook"
+ )
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
+ )
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators."
+ "display_video.tempfile"
+ )
+ def test_execute(self, mock_temp, gcs_mock_hook, mock_hook):
+ operation_name = "operation_name"
+ operation = {"key": "value"}
+ bucket_name = "bucket_name"
+ object_name = "object_name"
+ filename = "filename"
+ gzip = False
+
+ # mock_hook.return_value.create_sdf_download_operation.return_value = response_name
+ mock_hook.return_value.get_sdf_download_operation.return_value = operation
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+
+ op = GoogleDisplayVideo360SDFtoGCSOperator(
+ operation_name=operation_name,
+ bucket_name=bucket_name,
+ object_name=object_name,
+ gzip=gzip,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ task_id="test_task",
+ )
+
+ op.execute(context=None)
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
+ )
+
+ mock_hook.return_value.get_sdf_download_operation.assert_called_once()
+ mock_hook.return_value.get_sdf_download_operation.assert_called_once_with(
+ operation_name=operation_name
+ )
+
+ mock_hook.return_value.download_media.assert_called_once()
+ mock_hook.return_value.download_media.assert_called_once_with(
+ resource_name=mock_hook.return_value.get_sdf_download_operation.return_value
+ )
+
+ mock_hook.return_value.download_content_from_request.assert_called_once()
+ mock_hook.return_value.download_content_from_request.assert_called_once_with(
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value,
+ mock_hook.return_value.download_media(),
+ chunk_size=1024 * 1024,
+ )
+
+ gcs_mock_hook.assert_called_once()
+ gcs_mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO
+ )
+
+ gcs_mock_hook.return_value.upload.assert_called_once()
+ gcs_mock_hook.return_value.upload.assert_called_once_with(
bucket_name=bucket_name,
object_name=object_name,
filename=filename,
+ gzip=gzip,
+ )
+
+
+class TestGoogleDisplayVideo360CreateSDFDownloadTaskOperator(TestCase):
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators."
+ "display_video.GoogleDisplayVideo360Hook"
+ )
+ def test_execute(self, mock_hook):
+ body_request = {
+ "version": "1",
+ "id": "id",
+ "filter": {"id": []},
+ }
+
+ op = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
+ body_request=body_request,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ task_id="test_task",
+ )
+
+ op.execute(context=None)
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
+ )
+
+ mock_hook.return_value.create_sdf_download_operation.assert_called_once()
+ mock_hook.return_value.create_sdf_download_operation.assert_called_once_with(
+ body_request=body_request
)
- hook_mock.return_value.upload_line_items.assert_called_once()
- hook_mock.return_value.upload_line_items.assert_called_once_with(line_items=line_items)
diff --git a/tests/providers/google/marketing_platform/operators/test_display_video_system.py b/tests/providers/google/marketing_platform/operators/test_display_video_system.py
index 8cc441d..7d4358d 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video_system.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video_system.py
@@ -16,14 +16,16 @@
# under the License.
import pytest
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.marketing_platform.example_dags.example_display_video import BUCKET
-from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY, GMP_KEY
from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
# Requires the following scope:
SCOPES = [
"https://www.googleapis.com/auth/doubleclickbidmanager",
- "https://www.googleapis.com/auth/cloud-platform"
+ "https://www.googleapis.com/auth/cloud-platform",
+ "https://www.googleapis.com/auth/display-video"
]
@@ -37,7 +39,10 @@ class DisplayVideoSystemTest(GoogleSystemTest):
def tearDown(self):
self.delete_gcs_bucket(BUCKET)
- super().tearDown()
+ with provide_gcp_context(GCP_BIGQUERY_KEY, scopes=SCOPES):
+ hook = BigQueryHook()
+ hook.delete_dataset(dataset_id='airflow_test', delete_contents=True)
+ super().tearDown()
@provide_gcp_context(GMP_KEY, scopes=SCOPES)
def test_run_example_dag(self):
diff --git a/tests/providers/google/marketing_platform/sensors/test_display_video.py b/tests/providers/google/marketing_platform/sensors/test_display_video.py
index 624b35b..2daca30 100644
--- a/tests/providers/google/marketing_platform/sensors/test_display_video.py
+++ b/tests/providers/google/marketing_platform/sensors/test_display_video.py
@@ -19,7 +19,7 @@
from unittest import TestCase, mock
from airflow.providers.google.marketing_platform.sensors.display_video import (
- GoogleDisplayVideo360ReportSensor,
+ GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
)
API_VERSION = "api_version"
@@ -45,3 +45,26 @@ class TestGoogleDisplayVideo360ReportSensor(TestCase):
gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
)
hook_mock.return_value.get_query.assert_called_once_with(query_id=report_id)
+
+
+class TestGoogleDisplayVideo360Sensor(TestCase):
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.sensors."
+ "display_video.GoogleDisplayVideo360Hook"
+ )
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.sensors."
+ "display_video.BaseSensorOperator"
+ )
+ def test_poke(self, mock_base_op, hook_mock):
+ operation_name = "operation_name"
+ op = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
+ operation_name=operation_name, api_version=API_VERSION, task_id="test_task",
+ )
+ op.poke(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
+ )
+ hook_mock.return_value.get_sdf_download_operation.assert_called_once_with(
+ operation_name=operation_name
+ )