You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/06 18:31:14 UTC

[GitHub] [airflow] michalslowikowski00 opened a new pull request #8740: Added SDFtoGCSOperator

michalslowikowski00 opened a new pull request #8740:
URL: https://github.com/apache/airflow/pull/8740


   Added handling SDF entities (creating operation task and waiting for operation task) and saving in Google Cloud Storage.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8740: Added SDFtoGCSOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8740:
URL: https://github.com/apache/airflow/pull/8740#discussion_r421095020



##########
File path: airflow/providers/google/marketing_platform/hooks/display_video.py
##########
@@ -170,3 +184,51 @@ def download_line_items(self, request_body: Dict[str, Any]) -> List[Any]:
             .execute(num_retries=self.num_retries)
         )
         return response["lineItems"]
+
+    def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        Creates an SDF Download Task and Returns an Operation.
+        :param body_request: Body request.

Review comment:
       ```suggestion
   
           :param body_request: Body request.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #8740: Added SDFtoGCSOperator

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #8740:
URL: https://github.com/apache/airflow/pull/8740#discussion_r421234132



##########
File path: airflow/providers/google/marketing_platform/hooks/display_video.py
##########
@@ -170,3 +184,51 @@ def download_line_items(self, request_body: Dict[str, Any]) -> List[Any]:
             .execute(num_retries=self.num_retries)
         )
         return response["lineItems"]
+
+    def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        Creates an SDF Download Task and Returns an Operation.
+        :param body_request: Body request.

Review comment:
       Hm... Ok, thanks. 
   I did not have issues when I built documentation locally. I will fix it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #8740: Added SDFtoGCSOperator

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #8740:
URL: https://github.com/apache/airflow/pull/8740#discussion_r421234575



##########
File path: tests/providers/google/marketing_platform/hooks/test_display_video.py
##########
@@ -239,3 +257,163 @@ def test_upload_line_items_should_return_equal_values(self, get_conn_mock):
         result = self.hook.upload_line_items(line_items)
 
         self.assertEqual(return_value, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_called_with_params(
+        self, get_conn_to_display_video
+    ):
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        self.hook.create_sdf_download_operation(body_request=body_request)
+
+        get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once_with(
+            body=body_request
+        )
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        self.hook.create_sdf_download_operation(body_request=body_request)
+
+        get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_return_equal_values(
+        self, get_conn_to_display_video
+    ):
+        response = ["name"]
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            create.return_value\
+            .execute.return_value = response
+
+        result = self.hook.create_sdf_download_operation(body_request=body_request)
+        self.assertEqual(response, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_get_sdf_download_tasks_called_with_params(self, get_conn_to_display_video):
+        operation_name = "operation_name"
+        self.hook.get_sdf_download_operation(operation_name=operation_name)
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.\
+            get.assert_called_once_with(name=operation_name)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_get_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+        operation_name = "name"
+        self.hook.get_sdf_download_operation(operation_name=operation_name)
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.\
+            get.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def get_sdf_download_tasks_return_equal_values(self, get_conn_to_display_video):
+        operation_name = "operation"
+        response = "reposonse"
+
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.get = response
+
+        result = self.hook.get_sdf_download_operation(operation_name=operation_name)
+
+        self.assertEqual(operation_name, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_download_media_called_once(self, get_conn_to_display_video):
+        resource_name = "resource_name"
+
+        self.hook.download_media(resource_name=resource_name)
+        get_conn_to_display_video.return_value.\
+            media.return_value.\
+            download_media.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_download_media_called_once_with_params(self, get_conn_to_display_video):
+        resource_name = "resource_name"
+
+        self.hook.download_media(resource_name=resource_name)
+        get_conn_to_display_video.return_value.\
+            media.return_value.\
+            download_media.assert_called_once_with(resource_name=resource_name)
+
+    # @mock.patch(

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8740: Added SDFtoGCSOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8740:
URL: https://github.com/apache/airflow/pull/8740#discussion_r421094686



##########
File path: airflow/providers/google/marketing_platform/example_dags/example_display_video.py
##########
@@ -19,22 +19,34 @@
 Example Airflow DAG that shows how to use DisplayVideo.
 """
 import os
+from typing import Dict
 
 from airflow import models
+from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
 from airflow.providers.google.marketing_platform.operators.display_video import (
-    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
-    GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
-    GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
+    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
+    GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
+    GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
+    GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
 )
 from airflow.providers.google.marketing_platform.sensors.display_video import (
-    GoogleDisplayVideo360ReportSensor,
+    GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
 )
 from airflow.utils import dates
 
 # [START howto_display_video_env_variables]
 BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
 ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
 OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
+PATH_TO_UPLOAD_FILE = os.environ.get(
+    "GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
+)
+PATH_TO_SAVED_FILE = os.environ.get(
+    "GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
+)
+BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
+SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
+BQ_DATASET = "airflow_test"

Review comment:
       Can you parameterize it via env var?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8740: Added SDFtoGCSOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8740:
URL: https://github.com/apache/airflow/pull/8740#discussion_r421095405



##########
File path: tests/providers/google/marketing_platform/hooks/test_display_video.py
##########
@@ -239,3 +257,163 @@ def test_upload_line_items_should_return_equal_values(self, get_conn_mock):
         result = self.hook.upload_line_items(line_items)
 
         self.assertEqual(return_value, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_called_with_params(
+        self, get_conn_to_display_video
+    ):
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        self.hook.create_sdf_download_operation(body_request=body_request)
+
+        get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once_with(
+            body=body_request
+        )
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        self.hook.create_sdf_download_operation(body_request=body_request)
+
+        get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_create_sdf_download_tasks_return_equal_values(
+        self, get_conn_to_display_video
+    ):
+        response = ["name"]
+        body_request = {
+            "version": "version",
+            "partnerId": "partner_id",
+            "advertiserId": "advertiser_id",
+            "parentEntityFilter": "parent_entity_filter",
+            "idFilter": "id_filter",
+            "inventorySourceFilter": "inventory_source_filter",
+        }
+
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            create.return_value\
+            .execute.return_value = response
+
+        result = self.hook.create_sdf_download_operation(body_request=body_request)
+        self.assertEqual(response, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_get_sdf_download_tasks_called_with_params(self, get_conn_to_display_video):
+        operation_name = "operation_name"
+        self.hook.get_sdf_download_operation(operation_name=operation_name)
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.\
+            get.assert_called_once_with(name=operation_name)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_get_sdf_download_tasks_called_once(self, get_conn_to_display_video):
+        operation_name = "name"
+        self.hook.get_sdf_download_operation(operation_name=operation_name)
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.\
+            get.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def get_sdf_download_tasks_return_equal_values(self, get_conn_to_display_video):
+        operation_name = "operation"
+        response = "reposonse"
+
+        get_conn_to_display_video.return_value.\
+            sdfdownloadtasks.return_value.\
+            operation.return_value.get = response
+
+        result = self.hook.get_sdf_download_operation(operation_name=operation_name)
+
+        self.assertEqual(operation_name, result)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_download_media_called_once(self, get_conn_to_display_video):
+        resource_name = "resource_name"
+
+        self.hook.download_media(resource_name=resource_name)
+        get_conn_to_display_video.return_value.\
+            media.return_value.\
+            download_media.assert_called_once()
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video"
+    )
+    def test_download_media_called_once_with_params(self, get_conn_to_display_video):
+        resource_name = "resource_name"
+
+        self.hook.download_media(resource_name=resource_name)
+        get_conn_to_display_video.return_value.\
+            media.return_value.\
+            download_media.assert_called_once_with(resource_name=resource_name)
+
+    # @mock.patch(

Review comment:
       ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8740: Added SDFtoGCSOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8740:
URL: https://github.com/apache/airflow/pull/8740#discussion_r421095134



##########
File path: airflow/providers/google/marketing_platform/hooks/display_video.py
##########
@@ -170,3 +184,51 @@ def download_line_items(self, request_body: Dict[str, Any]) -> List[Any]:
             .execute(num_retries=self.num_retries)
         )
         return response["lineItems"]
+
+    def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        Creates an SDF Download Task and Returns an Operation.
+        :param body_request: Body request.

Review comment:
       The documentation will not be built without it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #8740: Added SDFtoGCSOperator

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #8740:
URL: https://github.com/apache/airflow/pull/8740#discussion_r421234757



##########
File path: airflow/providers/google/marketing_platform/example_dags/example_display_video.py
##########
@@ -19,22 +19,34 @@
 Example Airflow DAG that shows how to use DisplayVideo.
 """
 import os
+from typing import Dict
 
 from airflow import models
+from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
 from airflow.providers.google.marketing_platform.operators.display_video import (
-    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
-    GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
-    GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
+    GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
+    GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
+    GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
+    GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
 )
 from airflow.providers.google.marketing_platform.sensors.display_video import (
-    GoogleDisplayVideo360ReportSensor,
+    GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
 )
 from airflow.utils import dates
 
 # [START howto_display_video_env_variables]
 BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
 ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
 OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
+PATH_TO_UPLOAD_FILE = os.environ.get(
+    "GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
+)
+PATH_TO_SAVED_FILE = os.environ.get(
+    "GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
+)
+BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
+SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
+BQ_DATASET = "airflow_test"

Review comment:
       Ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org