You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 23:57:13 UTC

[airflow] branch main updated: `GoogleDriveHook`: Fixing log message + adding more verbose documentation (#29694)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f55b9576b1 `GoogleDriveHook`: Fixing log message + adding more verbose documentation (#29694)
f55b9576b1 is described below

commit f55b9576b1363d2cada0a25daf79501c8aad8b54
Author: Arkadiusz Rudny <93...@users.noreply.github.com>
AuthorDate: Sun Mar 5 00:57:01 2023 +0100

    `GoogleDriveHook`: Fixing log message + adding more verbose documentation (#29694)
---
 airflow/providers/google/suite/hooks/drive.py      | 66 +++++++++++++++++++++-
 .../google/suite/transfers/local_to_drive.py       |  7 ++-
 tests/providers/google/suite/hooks/test_drive.py   | 56 +++++++++++++++++-
 .../google/suite/transfers/test_local_to_drive.py  |  2 +
 4 files changed, 127 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/google/suite/hooks/drive.py b/airflow/providers/google/suite/hooks/drive.py
index 777cbc3e77..7519a9fd9b 100644
--- a/airflow/providers/google/suite/hooks/drive.py
+++ b/airflow/providers/google/suite/hooks/drive.py
@@ -21,6 +21,7 @@ from __future__ import annotations
 from typing import IO, Any, Sequence
 
 from googleapiclient.discovery import Resource, build
+from googleapiclient.errors import Error as GoogleApiClientError
 from googleapiclient.http import HttpRequest, MediaFileUpload
 
 from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@@ -159,6 +160,52 @@ class GoogleDriveHook(GoogleBaseHook):
             )
         )
 
+    def _get_file_info(self, file_id: str):
+        """
+        Returns Google API file_info object containing id, name, parents in the response
+        https://developers.google.com/drive/api/v3/reference/files/get
+
+        :param file_id: id as string representation of interested file
+        :return: file
+        """
+        file_info = (
+            self.get_conn()
+            .files()
+            .get(
+                fileId=file_id,
+                fields="id,name,parents",
+                supportsAllDrives=True,
+            )
+            .execute(num_retries=2)
+        )
+        return file_info
+
+    def _resolve_file_path(self, file_id: str) -> str:
+        """
+        Returns the full Google Drive path for given file_id
+
+        :param file_id: The id of a file in Google Drive
+        :return: Google Drive full path for a file
+        """
+        has_reached_root = False
+        current_file_id = file_id
+        path: str = ""
+        while not has_reached_root:
+            # current_file_id can be file or directory id, Google API treats them the same way.
+            file_info = self._get_file_info(current_file_id)
+            if current_file_id == file_id:
+                path = f'{file_info["name"]}'
+            else:
+                path = f'{file_info["name"]}/{path}'
+
+            # Google API returns parents array if there is at least one object inside
+            if "parents" in file_info and len(file_info["parents"]) == 1:
+                # https://developers.google.com/drive/api/guides/ref-single-parent
+                current_file_id = file_info["parents"][0]
+            else:
+                has_reached_root = True
+        return path
+
     def get_file_id(
         self, folder_id: str, file_name: str, drive_id: str | None = None, *, include_trashed: bool = True
     ) -> dict:
@@ -213,6 +260,7 @@ class GoogleDriveHook(GoogleBaseHook):
         chunk_size: int = 100 * 1024 * 1024,
         resumable: bool = False,
         folder_id: str = "root",
+        show_full_target_path: bool = True,
     ) -> str:
         """
         Uploads a file that is available locally to a Google Drive service.
@@ -227,6 +275,7 @@ class GoogleDriveHook(GoogleBaseHook):
         :param resumable: True if this is a resumable upload. False means upload
             in a single request.
         :param folder_id: The base/root folder id for remote_location (part of the drive URL of a folder).
+        :param show_full_target_path: If true then it reveals full available file path in the logs.
         :return: File ID
         """
         service = self.get_conn()
@@ -243,8 +292,21 @@ class GoogleDriveHook(GoogleBaseHook):
             .create(body=file_metadata, media_body=media, fields="id", supportsAllDrives=True)
             .execute(num_retries=self.num_retries)
         )
-        self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)
-        return file.get("id")
+        file_id = file.get("id")
+
+        upload_location = remote_location
+
+        if folder_id != "root":
+            try:
+                upload_location = self._resolve_file_path(folder_id)
+            except GoogleApiClientError as e:
+                self.log.warning("A problem has been encountered when trying to resolve file path: ", e)
+
+        if show_full_target_path:
+            self.log.info("File %s uploaded to gdrive://%s.", local_location, upload_location)
+        else:
+            self.log.info("File %s has been uploaded successfully to gdrive", local_location)
+        return file_id
 
     def download_file(self, file_id: str, file_handle: IO, chunk_size: int = 100 * 1024 * 1024):
         """
diff --git a/airflow/providers/google/suite/transfers/local_to_drive.py b/airflow/providers/google/suite/transfers/local_to_drive.py
index 14b6f2da4b..7201732218 100644
--- a/airflow/providers/google/suite/transfers/local_to_drive.py
+++ b/airflow/providers/google/suite/transfers/local_to_drive.py
@@ -40,7 +40,8 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
         :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
 
     :param local_paths: Python list of local file paths
-    :param drive_folder: path of the Drive folder
+    :param drive_folder: path of the Drive folder, if folder_id param is given then drive_folder is a
+        sub path of folder_id.
     :param gcp_conn_id: Airflow Connection ID for GCP
     :param delete: should the local files be deleted after upload?
     :param ignore_if_missing: if True, then don't fail even if all files
@@ -64,6 +65,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
         Service Account Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account
     :param folder_id: The base/root folder id for each local path in the Drive folder
+    :param show_full_target_path: If true then it reveals full available file path in the logs.
     :return: Remote file ids after upload
     """
 
@@ -84,6 +86,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
         delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         folder_id: str = "root",
+        show_full_target_path: bool = True,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -97,6 +100,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
         self.folder_id = folder_id
+        self.show_full_target_path = show_full_target_path
 
     def execute(self, context: Context) -> list[str]:
         hook = GoogleDriveHook(
@@ -117,6 +121,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
                     chunk_size=self.chunk_size,
                     resumable=self.resumable,
                     folder_id=self.folder_id,
+                    show_full_target_path=self.show_full_target_path,
                 )
 
                 remote_file_ids.append(remote_file_id)
diff --git a/tests/providers/google/suite/hooks/test_drive.py b/tests/providers/google/suite/hooks/test_drive.py
index d132307c02..14f8ecf7ec 100644
--- a/tests/providers/google/suite/hooks/test_drive.py
+++ b/tests/providers/google/suite/hooks/test_drive.py
@@ -273,6 +273,27 @@ class TestGoogleDriveHook:
         result_value = self.gdrive_hook.get_file_id(folder_id, file_name, drive_id)
         assert result_value == {"id": "ID_1", "mime_type": "text/plain"}
 
+    @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
+    def test_resolve_file_path_when_file_in_root_directory(self, mock_get_conn):
+        mock_get_conn.return_value.files.return_value.get.return_value.execute.side_effect = [
+            {"id": "ID_1", "name": "file.csv", "parents": ["ID_2"]},
+            {"id": "ID_2", "name": "root"},
+        ]
+
+        result_value = self.gdrive_hook._resolve_file_path(file_id="ID_1")
+        assert result_value == "root/file.csv"
+
+    @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
+    def test_resolve_file_path_when_file_nested_in_2_directories(self, mock_get_conn):
+        mock_get_conn.return_value.files.return_value.get.return_value.execute.side_effect = [
+            {"id": "ID_1", "name": "file.csv", "parents": ["ID_2"]},
+            {"id": "ID_2", "name": "folder_A", "parents": ["ID_3"]},
+            {"id": "ID_3", "name": "root"},
+        ]
+
+        result_value = self.gdrive_hook._resolve_file_path(file_id="ID_1")
+        assert result_value == "root/folder_A/file.csv"
+
     @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
     def test_get_file_id_when_multiple_files_exists(self, mock_get_conn):
         folder_id = "abxy1z"
@@ -300,8 +321,9 @@ class TestGoogleDriveHook:
     @mock.patch("airflow.providers.google.suite.hooks.drive.MediaFileUpload")
     @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
     @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._ensure_folders_exists")
+    @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._resolve_file_path")
     def test_upload_file_to_root_directory(
-        self, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
+        self, mock_resolve_file_path, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
     ):
         mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
             "id": "FILE_ID"
@@ -310,6 +332,7 @@ class TestGoogleDriveHook:
         return_value = self.gdrive_hook.upload_file("local_path", "remote_path")
 
         mock_ensure_folders_exists.assert_not_called()
+        mock_resolve_file_path.assert_not_called()
         mock_get_conn.assert_has_calls(
             [
                 mock.call()
@@ -353,3 +376,34 @@ class TestGoogleDriveHook:
             ]
         )
         assert return_value == "FILE_ID"
+
+    @mock.patch("airflow.providers.google.suite.hooks.drive.MediaFileUpload")
+    @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
+    @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._ensure_folders_exists")
+    @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._resolve_file_path")
+    def test_upload_file_into_folder_id(
+        self, mock_resolve_file_path, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
+    ):
+        file_id = "FILE_ID"
+        folder_id = "FOLDER_ID"
+        mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
+            "id": file_id
+        }
+        mock_resolve_file_path.return_value = "Shared_Folder_A/Folder_B"  # path for FOLDER_ID
+
+        return_value = self.gdrive_hook.upload_file("/tmp/file.csv", "/file.csv", folder_id=folder_id)
+
+        mock_ensure_folders_exists.assert_not_called()
+        mock_get_conn.assert_has_calls(
+            [
+                mock.call()
+                .files()
+                .create(
+                    body={"name": "file.csv", "parents": [folder_id]},
+                    fields="id",
+                    media_body=mock_media_file_upload.return_value,
+                    supportsAllDrives=True,
+                )
+            ]
+        )
+        assert return_value == file_id
diff --git a/tests/providers/google/suite/transfers/test_local_to_drive.py b/tests/providers/google/suite/transfers/test_local_to_drive.py
index affc4f1f7e..74caaeeb93 100644
--- a/tests/providers/google/suite/transfers/test_local_to_drive.py
+++ b/tests/providers/google/suite/transfers/test_local_to_drive.py
@@ -48,6 +48,7 @@ class TestLocalFilesystemToGoogleDriveOperator:
                 chunk_size=100 * 1024 * 1024,
                 resumable=False,
                 folder_id="some_folder_id",
+                show_full_target_path=True,
             ),
             mock.call(
                 local_location="test2",
@@ -55,6 +56,7 @@ class TestLocalFilesystemToGoogleDriveOperator:
                 chunk_size=100 * 1024 * 1024,
                 resumable=False,
                 folder_id="some_folder_id",
+                show_full_target_path=True,
             ),
         ]
         mock_hook.return_value.upload_file.assert_has_calls(calls)