You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 23:57:13 UTC
[airflow] branch main updated: `GoogleDriveHook`: Fixing log message + adding more verbose documentation (#29694)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f55b9576b1 `GoogleDriveHook`: Fixing log message + adding more verbose documentation (#29694)
f55b9576b1 is described below
commit f55b9576b1363d2cada0a25daf79501c8aad8b54
Author: Arkadiusz Rudny <93...@users.noreply.github.com>
AuthorDate: Sun Mar 5 00:57:01 2023 +0100
`GoogleDriveHook`: Fixing log message + adding more verbose documentation (#29694)
---
airflow/providers/google/suite/hooks/drive.py | 66 +++++++++++++++++++++-
.../google/suite/transfers/local_to_drive.py | 7 ++-
tests/providers/google/suite/hooks/test_drive.py | 56 +++++++++++++++++-
.../google/suite/transfers/test_local_to_drive.py | 2 +
4 files changed, 127 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/google/suite/hooks/drive.py b/airflow/providers/google/suite/hooks/drive.py
index 777cbc3e77..7519a9fd9b 100644
--- a/airflow/providers/google/suite/hooks/drive.py
+++ b/airflow/providers/google/suite/hooks/drive.py
@@ -21,6 +21,7 @@ from __future__ import annotations
from typing import IO, Any, Sequence
from googleapiclient.discovery import Resource, build
+from googleapiclient.errors import Error as GoogleApiClientError
from googleapiclient.http import HttpRequest, MediaFileUpload
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@@ -159,6 +160,52 @@ class GoogleDriveHook(GoogleBaseHook):
)
)
+ def _get_file_info(self, file_id: str):
+ """
+ Returns Google API file_info object containing id, name, parents in the response
+ https://developers.google.com/drive/api/v3/reference/files/get
+
+ :param file_id: id as string representation of interested file
+ :return: file
+ """
+ file_info = (
+ self.get_conn()
+ .files()
+ .get(
+ fileId=file_id,
+ fields="id,name,parents",
+ supportsAllDrives=True,
+ )
+ .execute(num_retries=2)
+ )
+ return file_info
+
+ def _resolve_file_path(self, file_id: str) -> str:
+ """
+ Returns the full Google Drive path for given file_id
+
+ :param file_id: The id of a file in Google Drive
+ :return: Google Drive full path for a file
+ """
+ has_reached_root = False
+ current_file_id = file_id
+ path: str = ""
+ while not has_reached_root:
+ # current_file_id can be file or directory id, Google API treats them the same way.
+ file_info = self._get_file_info(current_file_id)
+ if current_file_id == file_id:
+ path = f'{file_info["name"]}'
+ else:
+ path = f'{file_info["name"]}/{path}'
+
+ # Google API returns parents array if there is at least one object inside
+ if "parents" in file_info and len(file_info["parents"]) == 1:
+ # https://developers.google.com/drive/api/guides/ref-single-parent
+ current_file_id = file_info["parents"][0]
+ else:
+ has_reached_root = True
+ return path
+
def get_file_id(
self, folder_id: str, file_name: str, drive_id: str | None = None, *, include_trashed: bool = True
) -> dict:
@@ -213,6 +260,7 @@ class GoogleDriveHook(GoogleBaseHook):
chunk_size: int = 100 * 1024 * 1024,
resumable: bool = False,
folder_id: str = "root",
+ show_full_target_path: bool = True,
) -> str:
"""
Uploads a file that is available locally to a Google Drive service.
@@ -227,6 +275,7 @@ class GoogleDriveHook(GoogleBaseHook):
:param resumable: True if this is a resumable upload. False means upload
in a single request.
:param folder_id: The base/root folder id for remote_location (part of the drive URL of a folder).
+ :param show_full_target_path: If true then it reveals full available file path in the logs.
:return: File ID
"""
service = self.get_conn()
@@ -243,8 +292,21 @@ class GoogleDriveHook(GoogleBaseHook):
.create(body=file_metadata, media_body=media, fields="id", supportsAllDrives=True)
.execute(num_retries=self.num_retries)
)
- self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)
- return file.get("id")
+ file_id = file.get("id")
+
+ upload_location = remote_location
+
+ if folder_id != "root":
+ try:
+ upload_location = self._resolve_file_path(folder_id)
+ except GoogleApiClientError as e:
+ self.log.warning("A problem has been encountered when trying to resolve file path: ", e)
+
+ if show_full_target_path:
+ self.log.info("File %s uploaded to gdrive://%s.", local_location, upload_location)
+ else:
+ self.log.info("File %s has been uploaded successfully to gdrive", local_location)
+ return file_id
def download_file(self, file_id: str, file_handle: IO, chunk_size: int = 100 * 1024 * 1024):
"""
diff --git a/airflow/providers/google/suite/transfers/local_to_drive.py b/airflow/providers/google/suite/transfers/local_to_drive.py
index 14b6f2da4b..7201732218 100644
--- a/airflow/providers/google/suite/transfers/local_to_drive.py
+++ b/airflow/providers/google/suite/transfers/local_to_drive.py
@@ -40,7 +40,8 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
:ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
:param local_paths: Python list of local file paths
- :param drive_folder: path of the Drive folder
+ :param drive_folder: path of the Drive folder, if folder_id param is given then drive_folder is a
+ sub path of folder_id.
:param gcp_conn_id: Airflow Connection ID for GCP
:param delete: should the local files be deleted after upload?
:param ignore_if_missing: if True, then don't fail even if all files
@@ -64,6 +65,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account
:param folder_id: The base/root folder id for each local path in the Drive folder
+ :param show_full_target_path: If true then it reveals full available file path in the logs.
:return: Remote file ids after upload
"""
@@ -84,6 +86,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
folder_id: str = "root",
+ show_full_target_path: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -97,6 +100,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
self.folder_id = folder_id
+ self.show_full_target_path = show_full_target_path
def execute(self, context: Context) -> list[str]:
hook = GoogleDriveHook(
@@ -117,6 +121,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
chunk_size=self.chunk_size,
resumable=self.resumable,
folder_id=self.folder_id,
+ show_full_target_path=self.show_full_target_path,
)
remote_file_ids.append(remote_file_id)
diff --git a/tests/providers/google/suite/hooks/test_drive.py b/tests/providers/google/suite/hooks/test_drive.py
index d132307c02..14f8ecf7ec 100644
--- a/tests/providers/google/suite/hooks/test_drive.py
+++ b/tests/providers/google/suite/hooks/test_drive.py
@@ -273,6 +273,27 @@ class TestGoogleDriveHook:
result_value = self.gdrive_hook.get_file_id(folder_id, file_name, drive_id)
assert result_value == {"id": "ID_1", "mime_type": "text/plain"}
+ @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
+ def test_resolve_file_path_when_file_in_root_directory(self, mock_get_conn):
+ mock_get_conn.return_value.files.return_value.get.return_value.execute.side_effect = [
+ {"id": "ID_1", "name": "file.csv", "parents": ["ID_2"]},
+ {"id": "ID_2", "name": "root"},
+ ]
+
+ result_value = self.gdrive_hook._resolve_file_path(file_id="ID_1")
+ assert result_value == "root/file.csv"
+
+ @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
+ def test_resolve_file_path_when_file_nested_in_2_directories(self, mock_get_conn):
+ mock_get_conn.return_value.files.return_value.get.return_value.execute.side_effect = [
+ {"id": "ID_1", "name": "file.csv", "parents": ["ID_2"]},
+ {"id": "ID_2", "name": "folder_A", "parents": ["ID_3"]},
+ {"id": "ID_3", "name": "root"},
+ ]
+
+ result_value = self.gdrive_hook._resolve_file_path(file_id="ID_1")
+ assert result_value == "root/folder_A/file.csv"
+
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
def test_get_file_id_when_multiple_files_exists(self, mock_get_conn):
folder_id = "abxy1z"
@@ -300,8 +321,9 @@ class TestGoogleDriveHook:
@mock.patch("airflow.providers.google.suite.hooks.drive.MediaFileUpload")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
@mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._ensure_folders_exists")
+ @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._resolve_file_path")
def test_upload_file_to_root_directory(
- self, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
+ self, mock_resolve_file_path, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
):
mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
"id": "FILE_ID"
@@ -310,6 +332,7 @@ class TestGoogleDriveHook:
return_value = self.gdrive_hook.upload_file("local_path", "remote_path")
mock_ensure_folders_exists.assert_not_called()
+ mock_resolve_file_path.assert_not_called()
mock_get_conn.assert_has_calls(
[
mock.call()
@@ -353,3 +376,34 @@ class TestGoogleDriveHook:
]
)
assert return_value == "FILE_ID"
+
+ @mock.patch("airflow.providers.google.suite.hooks.drive.MediaFileUpload")
+ @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook.get_conn")
+ @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._ensure_folders_exists")
+ @mock.patch("airflow.providers.google.suite.hooks.drive.GoogleDriveHook._resolve_file_path")
+ def test_upload_file_into_folder_id(
+ self, mock_resolve_file_path, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
+ ):
+ file_id = "FILE_ID"
+ folder_id = "FOLDER_ID"
+ mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
+ "id": file_id
+ }
+ mock_resolve_file_path.return_value = "Shared_Folder_A/Folder_B" # path for FOLDER_ID
+
+ return_value = self.gdrive_hook.upload_file("/tmp/file.csv", "/file.csv", folder_id=folder_id)
+
+ mock_ensure_folders_exists.assert_not_called()
+ mock_get_conn.assert_has_calls(
+ [
+ mock.call()
+ .files()
+ .create(
+ body={"name": "file.csv", "parents": [folder_id]},
+ fields="id",
+ media_body=mock_media_file_upload.return_value,
+ supportsAllDrives=True,
+ )
+ ]
+ )
+ assert return_value == file_id
diff --git a/tests/providers/google/suite/transfers/test_local_to_drive.py b/tests/providers/google/suite/transfers/test_local_to_drive.py
index affc4f1f7e..74caaeeb93 100644
--- a/tests/providers/google/suite/transfers/test_local_to_drive.py
+++ b/tests/providers/google/suite/transfers/test_local_to_drive.py
@@ -48,6 +48,7 @@ class TestLocalFilesystemToGoogleDriveOperator:
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
+ show_full_target_path=True,
),
mock.call(
local_location="test2",
@@ -55,6 +56,7 @@ class TestLocalFilesystemToGoogleDriveOperator:
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
+ show_full_target_path=True,
),
]
mock_hook.return_value.upload_file.assert_has_calls(calls)