You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/08 08:21:22 UTC

[airflow] branch master updated: Added SDFtoGCSOperator (#8740)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 58aefb2  Added SDFtoGCSOperator (#8740)
58aefb2 is described below

commit 58aefb23b1d456bbb24876a4e3ff14f25d6274b0
Author: Michał Słowikowski <mi...@gmail.com>
AuthorDate: Fri May 8 10:20:44 2020 +0200

    Added SDFtoGCSOperator (#8740)
    
    Co-authored-by: michalslowikowski00 <mi...@polidea.com>
---
 .../providers/google/common/hooks/base_google.py   |  22 ++-
 .../example_dags/example_display_video.py          |  74 ++++++++--
 .../marketing_platform/hooks/display_video.py      |  67 ++++++++-
 .../marketing_platform/operators/display_video.py  | 163 ++++++++++++++++++++-
 .../marketing_platform/sensors/display_video.py    |  59 +++++++-
 docs/howto/operator/gcp/display_video.rst          |  55 +++++++
 .../marketing_platform/hooks/test_display_video.py | 151 ++++++++++++++++++-
 .../operators/test_display_video.py                | 141 +++++++++++++++---
 .../operators/test_display_video_system.py         |  11 +-
 .../sensors/test_display_video.py                  |  25 +++-
 10 files changed, 718 insertions(+), 50 deletions(-)

diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py
index a92d349..4d3418a 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -39,7 +39,7 @@ from google.api_core.gapic_v1.client_info import ClientInfo
 from google.auth import _cloud_sdk
 from google.auth.environment_vars import CREDENTIALS
 from googleapiclient.errors import HttpError
-from googleapiclient.http import set_user_agent
+from googleapiclient.http import MediaIoBaseDownload, set_user_agent
 
 from airflow import version
 from airflow.exceptions import AirflowException
@@ -456,3 +456,23 @@ class GoogleBaseHook(BaseHook):
                         creds_content["refresh_token"],
                     ])
             yield
+
+    @staticmethod
+    def download_content_from_request(file_handle, request, chunk_size):
+        """
+        Download media resources.
+        Note that  the Python file object is compatible with io.Base and can be used with this class also.
+
+        :param file_handle: io.Base or file object. The stream in which to write the downloaded
+            bytes.
+        :type file_handle: io.Base or file object
+        :param request: googleapiclient.http.HttpRequest, the media request to perform in chunks.
+        :type request: Dict
+        :param chunk_size: int, File will be downloaded in chunks of this many bytes.
+        :type chunk_size: int
+        """
+        downloader = MediaIoBaseDownload(file_handle, request, chunksize=chunk_size)
+        done = False
+        while done is False:
+            _, done = downloader.next_chunk()
+        file_handle.flush()
diff --git a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
index 6373972..a46cf2e 100644
--- a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+++ b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
@@ -19,15 +19,18 @@
 Example Airflow DAG that shows how to use DisplayVideo.
 """
 import os
+from typing import Dict
 
 from airflow import models
+from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
 from airflow.providers.google.marketing_platform.operators.display_video import (
-    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
-    GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
-    GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
+    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
+    GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
+    GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
+    GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
 )
 from airflow.providers.google.marketing_platform.sensors.display_video import (
-    GoogleDisplayVideo360ReportSensor,
+    GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
 )
 from airflow.utils import dates
 
@@ -35,6 +38,15 @@ from airflow.utils import dates
 BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
 ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
 OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
+PATH_TO_UPLOAD_FILE = os.environ.get(
+    "GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
+)
+PATH_TO_SAVED_FILE = os.environ.get(
+    "GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
+)
+BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
+SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
+BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
 
 REPORT = {
     "kind": "doubleclickbidmanager#query",
@@ -55,14 +67,16 @@ REPORT = {
 }
 
 PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
+
+BODY_REQUEST: Dict = {
+    "version": SDF_VERSION,
+    "advertiserId": ADVERTISER_ID,
+    "inventorySourceFilter": {"inventorySourceIds": []},
+}
 # [END howto_display_video_env_variables]
 
 # download_line_items variables
-REQUEST_BODY = {
-    "filterType": ADVERTISER_ID,
-    "format": "CSV",
-    "fileSpec": "EWF"
-}
+REQUEST_BODY = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"}
 
 default_args = {"start_date": dates.days_ago(1)}
 
@@ -119,7 +133,47 @@ with models.DAG(
     upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator(
         task_id="upload_line_items",
         bucket_name=BUCKET,
-        object_name=OBJECT_NAME,
+        object_name=BUCKET_FILE_LOCATION,
     )
     # [END howto_google_display_video_upload_line_items_operator]
+
+    # [START howto_google_display_video_create_sdf_download_task_operator]
+    create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
+        task_id="create_sdf_download_task", body_request=BODY_REQUEST
+    )
+    operation_name = '{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'
+    # [END howto_google_display_video_create_sdf_download_task_operator]
+
+    # [START howto_google_display_video_wait_for_operation_sensor]
+    wait_for_operation = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
+        task_id="wait_for_operation", operation_name=operation_name,
+    )
+    # [END howto_google_display_video_wait_for_operation_sensor]
+
+    # [START howto_google_display_video_save_sdf_in_gcs_operator]
+    save_sdf_in_gcs = GoogleDisplayVideo360SDFtoGCSOperator(
+        task_id="save_sdf_in_gcs",
+        operation_name=operation_name,
+        bucket_name=BUCKET,
+        object_name=BUCKET_FILE_LOCATION,
+        gzip=False,
+    )
+    # [END howto_google_display_video_save_sdf_in_gcs_operator]
+
+    # [START howto_google_display_video_gcs_to_big_query_operator]
+    upload_sdf_to_big_query = GCSToBigQueryOperator(
+        task_id="upload_sdf_to_big_query",
+        bucket=BUCKET,
+        source_objects=['{{ task_instance.xcom_pull("upload_sdf_to_bigquery")}}'],
+        destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table",
+        schema_fields=[
+            {"name": "name", "type": "STRING", "mode": "NULLABLE"},
+            {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
+        ],
+        write_disposition="WRITE_TRUNCATE",
+        dag=dag,
+    )
+    # [END howto_google_display_video_gcs_to_big_query_operator]
+
     create_report >> run_report >> wait_for_report >> get_report >> delete_report
+    create_sdf_download_task >> wait_for_operation >> save_sdf_in_gcs >> upload_sdf_to_big_query
diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py b/airflow/providers/google/marketing_platform/hooks/display_video.py
index e7bb797..9985dd0 100644
--- a/airflow/providers/google/marketing_platform/hooks/display_video.py
+++ b/airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -56,6 +56,20 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
             )
         return self._conn
 
+    def get_conn_to_display_video(self) -> Resource:
+        """
+        Retrieves connection to DisplayVideo.
+        """
+        if not self._conn:
+            http_authorized = self._authorize()
+            self._conn = build(
+                "displayvideo",
+                self.api_version,
+                http=http_authorized,
+                cache_discovery=False,
+            )
+        return self._conn
+
     def create_query(self, query: Dict[str, Any]) -> Dict:
         """
         Creates a query.
@@ -111,7 +125,7 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
             .listqueries()
             .execute(num_retries=self.num_retries)
         )
-        return response.get('queries', [])
+        return response.get("queries", [])
 
     def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
         """
@@ -170,3 +184,54 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
             .execute(num_retries=self.num_retries)
         )
         return response["lineItems"]
+
+    def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        Creates an SDF Download Task and Returns an Operation.
+
+        :param body_request: Body request.
+        :type body_request: Dict[str, Any]
+
+        More information about body request n be found here:
+        https://developers.google.com/display-video/api/reference/rest/v1/sdfdownloadtasks/create
+        """
+
+        result = (
+            self.get_conn_to_display_video()  # pylint: disable=no-member
+            .sdfdownloadtasks()
+            .create(body=body_request)
+            .execute(num_retries=self.num_retries)
+        )
+        return result
+
+    def get_sdf_download_operation(self, operation_name: str):
+        """
+        Gets the latest state of an asynchronous SDF download task operation.
+
+        :param operation_name: The name of the operation resource.
+        :type operation_name: str
+        """
+
+        result = (
+            self.get_conn_to_display_video()  # pylint: disable=no-member
+            .sdfdownloadtasks()
+            .operation()
+            .get(name=operation_name)
+            .execute(num_retries=self.num_retries)
+        )
+        return result
+
+    def download_media(self, resource_name: str):
+        """
+        Downloads media.
+
+        :param resource_name: of the media that is being downloaded.
+        :type resource_name: str
+        """
+
+        request = (
+            self.get_conn_to_display_video()  # pylint: disable=no-member
+            .media()
+            .download_media(resource_name=resource_name)
+        )
+        return request
diff --git a/airflow/providers/google/marketing_platform/operators/display_video.py b/airflow/providers/google/marketing_platform/operators/display_video.py
index f4c70c5..993f93d 100644
--- a/airflow/providers/google/marketing_platform/operators/display_video.py
+++ b/airflow/providers/google/marketing_platform/operators/display_video.py
@@ -67,7 +67,7 @@ class GoogleDisplayVideo360CreateReportOperator(BaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         *args,
-        **kwargs
+        **kwargs,
     ) -> None:
         super().__init__(*args, **kwargs)
         self.body = body
@@ -125,7 +125,7 @@ class GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         *args,
-        **kwargs
+        **kwargs,
     ) -> None:
         super().__init__(*args, **kwargs)
         self.report_id = report_id
@@ -209,7 +209,7 @@ class GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         *args,
-        **kwargs
+        **kwargs,
     ) -> None:
         super().__init__(*args, **kwargs)
         self.report_id = report_id
@@ -312,7 +312,7 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         *args,
-        **kwargs
+        **kwargs,
     ) -> None:
         super().__init__(*args, **kwargs)
         self.report_id = report_id
@@ -440,7 +440,7 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         *args,
-        **kwargs
+        **kwargs,
     ) -> None:
         super().__init__(*args, **kwargs)
         self.bucket_name = bucket_name
@@ -450,9 +450,7 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context: Dict):
-        gcs_hook = GCSHook(
-            gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
-        )
+        gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
         hook = GoogleDisplayVideo360Hook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -470,3 +468,152 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator):
             )
             f.flush()
             hook.upload_line_items(line_items=line_items)
+
+
+class GoogleDisplayVideo360CreateSDFDownloadTaskOperator(BaseOperator):
+    """
+    Creates SDF operation task.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GoogleDisplayVideo360CreateSDFDownloadTaskOperator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/display-video/api/reference/rest`
+
+    :param version: The SDF version of the downloaded file..
+    :type version: str
+    :param partner_id: The ID of the partner to download SDF for.
+    :type partner_id: str
+    :param advertiser_id: The ID of the advertiser to download SDF for.
+    :type advertiser_id: str
+    :param parent_entity_filter: Filters on selected file types.
+    :type parent_entity_filter: Dict[str, Any]
+    :param id_filter: Filters on entities by their entity IDs.
+    :type id_filter: Dict[str, Any]
+    :param inventory_source_filter: Filters on Inventory Sources by their IDs.
+    :type inventory_source_filter: Dict[str, Any]
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+        request must have  domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ("body_request", )
+
+    @apply_defaults
+    def __init__(
+        self,
+        body_request: Dict[str, Any],
+        api_version: str = "v1",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.body_request = body_request
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context: Dict):
+        hook = GoogleDisplayVideo360Hook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+        )
+
+        self.log.info("Creating operation for SDF download task...")
+        operation = hook.create_sdf_download_operation(
+            body_request=self.body_request
+        )
+
+        return operation
+
+
+class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator):
+    """
+    Download SDF media and save it in the Google Cloud Storage.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GoogleDisplayVideo360SDFtoGCSOperator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/display-video/api/reference/rest`
+
+    :param version: The SDF version of the downloaded file..
+    :type version: str
+    :param partner_id: The ID of the partner to download SDF for.
+    :type partner_id: str
+    :param advertiser_id: The ID of the advertiser to download SDF for.
+    :type advertiser_id: str
+    :param parent_entity_filter: Filters on selected file types.
+    :type parent_entity_filter: Dict[str, Any]
+    :param id_filter: Filters on entities by their entity IDs.
+    :type id_filter: Dict[str, Any]
+    :param inventory_source_filter: Filters on Inventory Sources by their IDs.
+    :type inventory_source_filter: Dict[str, Any]
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+        request must have  domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ("operation_name", "bucket_name", "object_name", "body_request")
+
+    @apply_defaults
+    def __init__(
+        self,
+        operation_name: str,
+        bucket_name: str,
+        object_name: str,
+        gzip: bool = False,
+        api_version: str = "v1",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.operation_name = operation_name
+        self.bucket_name = bucket_name
+        self.object_name = object_name
+        self.gzip = gzip
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context: Dict):
+        hook = GoogleDisplayVideo360Hook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+        )
+        gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
+
+        self.log.info("Retrieving operation...")
+        operation = hook.get_sdf_download_operation(operation_name=self.operation_name)
+
+        self.log.info("Creating file for upload...")
+        media = hook.download_media(resource_name=operation)
+
+        self.log.info("Sending file to the Google Cloud Storage...")
+        with tempfile.NamedTemporaryFile() as temp_file:
+            hook.download_content_from_request(
+                temp_file, media, chunk_size=1024 * 1024
+            )
+            temp_file.flush()
+            gcs_hook.upload(
+                bucket_name=self.bucket_name,
+                object_name=self.object_name,
+                filename=temp_file.name,
+                gzip=self.gzip,
+            )
+
+        return f"{self.bucket_name}/{self.object_name}"
diff --git a/airflow/providers/google/marketing_platform/sensors/display_video.py b/airflow/providers/google/marketing_platform/sensors/display_video.py
index 4aafff3..7306c69 100644
--- a/airflow/providers/google/marketing_platform/sensors/display_video.py
+++ b/airflow/providers/google/marketing_platform/sensors/display_video.py
@@ -20,6 +20,7 @@ Sensor for detecting the completion of DV360 reports.
 """
 from typing import Dict, Optional
 
+from airflow import AirflowException
 from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 
@@ -38,7 +39,7 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator):
     :type api_version: str
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :type gcp_conn_id: str
-    :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
+    :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
         request must have  domain-wide delegation enabled.
     :type delegate_to: str
     """
@@ -72,3 +73,59 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator):
         if response and not response.get("metadata", {}).get("running"):
             return True
         return False
+
+
+class GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
+    """
+    Sensor for detecting the completion of SDF operation.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GoogleDisplayVideo360GetSDFDownloadOperationSensor`
+
+    :param name: The name of the operation resource
+    :type name: Dict[str, Any]
+    :param api_version: The version of the api that will be requested for example 'v1'.
+    :type api_version: str
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+        request must have  domain-wide delegation enabled.
+    :type delegate_to: str
+
+    """
+
+    template_fields = ("operation_name", )
+
+    def __init__(
+        self,
+        operation_name: str,
+        api_version: str = "v1",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        mode: str = "reschedule",
+        poke_interval: int = 60 * 5,
+        *args,
+        **kwargs
+    ):
+        super().__init__(*args, **kwargs)
+        self.mode = mode
+        self.poke_interval = poke_interval
+        self.operation_name = operation_name
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def poke(self, context: Dict) -> bool:
+        hook = GoogleDisplayVideo360Hook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+        )
+        operation = hook.get_sdf_download_operation(operation_name=self.operation_name)
+
+        if "error" in operation:
+            raise AirflowException(f'The operation finished in error with {operation["error"]}')
+        if operation and operation.get("done"):
+            return True
+        return False
diff --git a/docs/howto/operator/gcp/display_video.rst b/docs/howto/operator/gcp/display_video.rst
index 6890f81..04f4814 100644
--- a/docs/howto/operator/gcp/display_video.rst
+++ b/docs/howto/operator/gcp/display_video.rst
@@ -168,3 +168,58 @@ To run Display&Video 360 uploading line items use
 Use :ref:`Jinja templating <jinja-templating>` with
 :template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360UploadLineItemsOperator`
 parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleDisplayVideo360CreateSDFDownloadTaskOperator:
+
+Create SDF download task
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+To create SDF download task use
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_display_video_create_sdf_download_task_operator]
+    :end-before: [END howto_google_display_video_create_sdf_download_task_operator]
+
+Use :ref:`Jinja templating <jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator`
+parameters which allow you to dynamically determine values.
+
+
+.. _howto/operator:GoogleDisplayVideo360SDFtoGCSOperator:
+
+Save SDF files in the Google Cloud Storage
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To save SDF files and save them in the Google Cloud Storage use
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360SDFtoGCSOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_display_video_save_sdf_in_gcs_operator]
+    :end-before: [END howto_google_display_video_save_sdf_in_gcs_operator]
+
+Use :ref:`Jinja templating <jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360SDFtoGCSOperator`
+parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleDisplayVideo360GetSDFDownloadOperationSensor:
+
+Waiting for SDF operation
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Wait for SDF operation is executed by:
+:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360GetSDFDownloadOperationSensor`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_display_video_wait_for_operation_sensor]
+    :end-before: [END howto_google_display_video_wait_for_operation_sensor]
+
+Use :ref:`Jinja templating <jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360GetSDFDownloadOperationSensor`
+parameters which allow you to dynamically determine values.
diff --git a/tests/providers/google/marketing_platform/hooks/test_display_video.py b/tests/providers/google/marketing_platform/hooks/test_display_video.py
index e4bba9d..f61f118 100644
--- a/tests/providers/google/marketing_platform/hooks/test_display_video.py
+++ b/tests/providers/google/marketing_platform/hooks/test_display_video.py
@@ -36,8 +36,9 @@ class TestGoogleDisplayVideo360Hook(TestCase):
         "airflow.providers.google.marketing_platform.hooks."
         "display_video.GoogleDisplayVideo360Hook._authorize"
     )
-    @mock.patch("airflow.providers.google.marketing_platform.hooks."
-                "display_video.build")
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks." "display_video.build"
+    )
     def test_gen_conn(self, mock_build, mock_authorize):
         result = self.hook.get_conn()
         mock_build.assert_called_once_with(
@@ -50,6 +51,23 @@ class TestGoogleDisplayVideo360Hook(TestCase):
 
     @mock.patch(
         "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook._authorize"
+    )
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks." "display_video.build"
+    )
+    def test_get_conn_to_display_video(self, mock_build, mock_authorize):
+        result = self.hook.get_conn_to_display_video()
+        mock_build.assert_called_once_with(
+            "displayvideo",
+            API_VERSION,
+            http=mock_authorize.return_value,
+            cache_discovery=False,
+        )
+        self.assertEqual(mock_build.return_value, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
         "display_video.GoogleDisplayVideo360Hook.get_conn"
     )
     def test_create_query(self, get_conn_mock):
@@ -239,3 +257,132 @@ class TestGoogleDisplayVideo360Hook(TestCase):
         result = self.hook.upload_line_items(line_items)
 
         self.assertEqual(return_value, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_called_with_params(
+        self, get_conn_to_display_video
+    ):
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        self.hook.create_sdf_download_operation(body_request=body_request)
+
+        get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once_with(
+            body=body_request
+        )
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        self.hook.create_sdf_download_operation(body_request=body_request)
+
+        get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_return_equal_values(
+        self, get_conn_to_display_video
+    ):
+        response = ["name"]
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            create.return_value\
+            .execute.return_value = response
+
+        result = self.hook.create_sdf_download_operation(body_request=body_request)
+        self.assertEqual(response, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_get_sdf_download_tasks_called_with_params(self, get_conn_to_display_video):
+        operation_name = "operation_name"
+        self.hook.get_sdf_download_operation(operation_name=operation_name)
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.\
+            get.assert_called_once_with(name=operation_name)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_get_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+        operation_name = "name"
+        self.hook.get_sdf_download_operation(operation_name=operation_name)
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.\
+            get.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def get_sdf_download_tasks_return_equal_values(self, get_conn_to_display_video):
+        operation_name = "operation"
+        response = "reposonse"
+
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.get = response
+
+        result = self.hook.get_sdf_download_operation(operation_name=operation_name)
+
+        self.assertEqual(operation_name, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_download_media_called_once(self, get_conn_to_display_video):
+        resource_name = "resource_name"
+
+        self.hook.download_media(resource_name=resource_name)
+        get_conn_to_display_video.return_value.\
+            media.return_value.\
+            download_media.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_download_media_called_once_with_params(self, get_conn_to_display_video):
+        resource_name = "resource_name"
+
+        self.hook.download_media(resource_name=resource_name)
+        get_conn_to_display_video.return_value.\
+            media.return_value.\
+            download_media.assert_called_once_with(resource_name=resource_name)
diff --git a/tests/providers/google/marketing_platform/operators/test_display_video.py b/tests/providers/google/marketing_platform/operators/test_display_video.py
index 45fe897..dcff1cb 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video.py
@@ -19,9 +19,10 @@
 from unittest import TestCase, mock
 
 from airflow.providers.google.marketing_platform.operators.display_video import (
-    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
-    GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
-    GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
+    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
+    GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
+    GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
+    GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
 )
 
 API_VERSION = "api_version"
@@ -80,8 +81,7 @@ class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
 
 class TestGoogleDisplayVideo360GetReportOperator(TestCase):
     @mock.patch(
-        "airflow.providers.google.marketing_platform.operators."
-        "display_video.shutil"
+        "airflow.providers.google.marketing_platform.operators." "display_video.shutil"
     )
     @mock.patch(
         "airflow.providers.google.marketing_platform.operators."
@@ -96,8 +96,7 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase):
         "display_video.GoogleDisplayVideo360DownloadReportOperator.xcom_push"
     )
     @mock.patch(
-        "airflow.providers.google.marketing_platform.operators."
-        "display_video.GCSHook"
+        "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
     )
     @mock.patch(
         "airflow.providers.google.marketing_platform.operators."
@@ -189,8 +188,7 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
         "display_video.GoogleDisplayVideo360Hook"
     )
     @mock.patch(
-        "airflow.providers.google.marketing_platform.operators."
-        "display_video.GCSHook"
+        "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
     )
     @mock.patch(
         "airflow.providers.google.marketing_platform.operators."
@@ -201,7 +199,7 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
             "filterType": "filter_type",
             "filterIds": [],
             "format": "format",
-            "fileSpec": "file_spec"
+            "fileSpec": "file_spec",
         }
         bucket_name = "bucket_name"
         object_name = "object_name"
@@ -227,12 +225,11 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase):
             object_name=object_name,
             filename=filename,
             gzip=gzip,
-            mime_type='text/csv',
+            mime_type="text/csv",
         )
 
         gcs_hook_mock.assert_called_once_with(
-            gcp_conn_id=GCP_CONN_ID,
-            delegate_to=DELEGATE_TO,
+            gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO,
         )
         hook_mock.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
@@ -252,8 +249,7 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
         "display_video.GoogleDisplayVideo360Hook"
     )
     @mock.patch(
-        "airflow.providers.google.marketing_platform.operators."
-        "display_video.GCSHook"
+        "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
     )
     def test_execute(self, gcs_hook_mock, hook_mock, mock_tempfile):
         filename = "filename"
@@ -261,7 +257,9 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
         bucket_name = "bucket_name"
         line_items = "holy_hand_grenade"
         gcs_hook_mock.return_value.download.return_value = line_items
-        mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+        mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = (
+            filename
+        )
 
         op = GoogleDisplayVideo360UploadLineItemsOperator(
             bucket_name=bucket_name,
@@ -272,20 +270,117 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase):
         )
         op.execute(context=None)
         hook_mock.assert_called_once_with(
-            gcp_conn_id=GCP_CONN_ID,
-            api_version=API_VERSION,
-            delegate_to=DELEGATE_TO
+            gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
         )
 
         gcs_hook_mock.assert_called_once_with(
-            gcp_conn_id=GCP_CONN_ID,
-            delegate_to=DELEGATE_TO,
+            gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO,
         )
 
         gcs_hook_mock.return_value.download.assert_called_once_with(
+            bucket_name=bucket_name, object_name=object_name, filename=filename,
+        )
+        hook_mock.return_value.upload_line_items.assert_called_once()
+        hook_mock.return_value.upload_line_items.assert_called_once_with(
+            line_items=line_items
+        )
+
+
+class TestGoogleDisplayVideo360SDFtoGCSOperator(TestCase):
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators."
+        "display_video.GoogleDisplayVideo360Hook"
+    )
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook"
+    )
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators."
+        "display_video.tempfile"
+    )
+    def test_execute(self, mock_temp, gcs_mock_hook, mock_hook):
+        operation_name = "operation_name"
+        operation = {"key": "value"}
+        bucket_name = "bucket_name"
+        object_name = "object_name"
+        filename = "filename"
+        gzip = False
+
+        # mock_hook.return_value.create_sdf_download_operation.return_value = response_name
+        mock_hook.return_value.get_sdf_download_operation.return_value = operation
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
+
+        op = GoogleDisplayVideo360SDFtoGCSOperator(
+            operation_name=operation_name,
+            bucket_name=bucket_name,
+            object_name=object_name,
+            gzip=gzip,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            task_id="test_task",
+        )
+
+        op.execute(context=None)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
+        )
+
+        mock_hook.return_value.get_sdf_download_operation.assert_called_once()
+        mock_hook.return_value.get_sdf_download_operation.assert_called_once_with(
+            operation_name=operation_name
+        )
+
+        mock_hook.return_value.download_media.assert_called_once()
+        mock_hook.return_value.download_media.assert_called_once_with(
+            resource_name=mock_hook.return_value.get_sdf_download_operation.return_value
+        )
+
+        mock_hook.return_value.download_content_from_request.assert_called_once()
+        mock_hook.return_value.download_content_from_request.assert_called_once_with(
+            mock_temp.NamedTemporaryFile.return_value.__enter__.return_value,
+            mock_hook.return_value.download_media(),
+            chunk_size=1024 * 1024,
+        )
+
+        gcs_mock_hook.assert_called_once()
+        gcs_mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO
+        )
+
+        gcs_mock_hook.return_value.upload.assert_called_once()
+        gcs_mock_hook.return_value.upload.assert_called_once_with(
             bucket_name=bucket_name,
             object_name=object_name,
             filename=filename,
+            gzip=gzip,
+        )
+
+
+class TestGoogleDisplayVideo360CreateSDFDownloadTaskOperator(TestCase):
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators."
+        "display_video.GoogleDisplayVideo360Hook"
+    )
+    def test_execute(self, mock_hook):
+        body_request = {
+            "version": "1",
+            "id": "id",
+            "filter": {"id": []},
+        }
+
+        op = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
+            body_request=body_request,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            task_id="test_task",
+        )
+
+        op.execute(context=None)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO
+        )
+
+        mock_hook.return_value.create_sdf_download_operation.assert_called_once()
+        mock_hook.return_value.create_sdf_download_operation.assert_called_once_with(
+            body_request=body_request
         )
-        hook_mock.return_value.upload_line_items.assert_called_once()
-        hook_mock.return_value.upload_line_items.assert_called_once_with(line_items=line_items)
diff --git a/tests/providers/google/marketing_platform/operators/test_display_video_system.py b/tests/providers/google/marketing_platform/operators/test_display_video_system.py
index 8cc441d..7d4358d 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video_system.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video_system.py
@@ -16,14 +16,16 @@
 # under the License.
 import pytest
 
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.google.marketing_platform.example_dags.example_display_video import BUCKET
-from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY, GMP_KEY
 from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
 
 # Requires the following scope:
 SCOPES = [
     "https://www.googleapis.com/auth/doubleclickbidmanager",
-    "https://www.googleapis.com/auth/cloud-platform"
+    "https://www.googleapis.com/auth/cloud-platform",
+    "https://www.googleapis.com/auth/display-video"
 ]
 
 
@@ -37,7 +39,10 @@ class DisplayVideoSystemTest(GoogleSystemTest):
 
     def tearDown(self):
         self.delete_gcs_bucket(BUCKET)
-        super().tearDown()
+        with provide_gcp_context(GCP_BIGQUERY_KEY, scopes=SCOPES):
+            hook = BigQueryHook()
+            hook.delete_dataset(dataset_id='airflow_test', delete_contents=True)
+            super().tearDown()
 
     @provide_gcp_context(GMP_KEY, scopes=SCOPES)
     def test_run_example_dag(self):
diff --git a/tests/providers/google/marketing_platform/sensors/test_display_video.py b/tests/providers/google/marketing_platform/sensors/test_display_video.py
index 624b35b..2daca30 100644
--- a/tests/providers/google/marketing_platform/sensors/test_display_video.py
+++ b/tests/providers/google/marketing_platform/sensors/test_display_video.py
@@ -19,7 +19,7 @@
 from unittest import TestCase, mock
 
 from airflow.providers.google.marketing_platform.sensors.display_video import (
-    GoogleDisplayVideo360ReportSensor,
+    GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
 )
 
 API_VERSION = "api_version"
@@ -45,3 +45,26 @@ class TestGoogleDisplayVideo360ReportSensor(TestCase):
             gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
         )
         hook_mock.return_value.get_query.assert_called_once_with(query_id=report_id)
+
+
+class TestGoogleDisplayVideo360Sensor(TestCase):
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.sensors."
+        "display_video.GoogleDisplayVideo360Hook"
+    )
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.sensors."
+        "display_video.BaseSensorOperator"
+    )
+    def test_poke(self, mock_base_op, hook_mock):
+        operation_name = "operation_name"
+        op = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
+            operation_name=operation_name, api_version=API_VERSION, task_id="test_task",
+        )
+        op.poke(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
+        )
+        hook_mock.return_value.get_sdf_download_operation.assert_called_once_with(
+            operation_name=operation_name
+        )