You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/13 14:52:14 UTC

[GitHub] [airflow] ulsc opened a new pull request #22219: Gdrive upload operator

ulsc opened a new pull request #22219:
URL: https://github.com/apache/airflow/pull/22219


   Operator to upload a list of files to a Google Drive folder, alongside a file which can be used to have more Google Drive operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067870149


   I'll be fixing that later today @Bowrna


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067288572


   `drive_folder` is also a template field, so to support jinja altogether, we need to change the path composition a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825461523



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param api_version: version of the Google Drive API
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = ('local_paths', 'drive_folder',)
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        api_version: str = 'v3',

Review comment:
       Must we default a specific version?
   
   The google provider is full of specific versions
   https://github.com/apache/airflow/blob/a840561e1f00ce026a813039e430a085d2cfe35e/airflow/providers/google/cloud/operators/datafusion.py#L166
   https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/google/marketing_platform/operators/display_video.py#L76
   
   
   This means that we need to remember to bump it when the version is deprecated/old and make it as a breaking change.
   Does Google offer default version that comes within its own lib?
   like facebook has (for example)
   https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py#L65-L66
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825490843



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Any) -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+            file_name = local_path.split('/')[-1]

Review comment:
       I would suggest using `Path` from [pathlib](https://docs.python.org/3/library/pathlib.html) so you can do `local_path.name` instead of splitting string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825490206



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,

Review comment:
       Can we use default conn name as we do in other Google operators?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068461138


   > so there is no transfer between two services
   
   There is... and we have a automated way to handle it properly with source/target in the provider yaml. Example:
   https://github.com/apache/airflow/blob/16adc035b1ecdf533f44fbb3e32bea972127bb71/airflow/providers/microsoft/azure/provider.yaml#L169
   
   The operator name should be `LocalFilesystemToGoogleDrive`
   The `LocalFilesystemTo`* is the convention:
   ```
   LocalFilesystemToGCSOperator
   LocalFilesystemToS3Operator
   LocalFilesystemToADLSOperator
   LocalFilesystemToWasbOperator
   ```
   The file should be called `local_go_drive.py` and be placed in the transfers folder
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal edited a comment on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal edited a comment on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068461138






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830594496



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.exceptions import AirflowFailException
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param ignore_if_missing: if True, then don't fail even if all files
+        can't be uploaded.
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        ignore_if_missing: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.ignore_if_missing = ignore_if_missing
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info("Uploading file to Google Drive: %s", local_path)
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info("Deleted local file: %s", local_path)
+            except FileNotFoundError:
+                self.log.warning("File %s can't be found", local_path)

Review comment:
       I prefer not to be picky and catch all `OSError` but maybe that is just me :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830580013



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")

Review comment:
       cool, wasn't aware of that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825468633



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param api_version: version of the Google Drive API
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = ('local_paths', 'drive_folder',)
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        api_version: str = 'v3',

Review comment:
       I understand, but anyways, if the user wants to change the api version directly from the operator, this may require a breaking change on the underlying hook. So if api_version will change, it should be changed in the hook level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825490555



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Any) -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+            file_name = local_path.split('/')[-1]
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=local_path,
+                    remote_location=os.path.join(self.drive_folder, file_name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.error(f"File {local_path} can't be found")

Review comment:
       Should this be a warning? Personally I would use error only if we would fail if the file does not exist 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r827234851



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> Sequence[str]:

Review comment:
       ```suggestion
       def execute(self, context: "Context") -> List[str]:
   ```
   More accurate since we know the specific sequence type being returned.

##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")
+
+        self.xcom_push(context, "remote_file_ids", remote_file_ids)
+        return remote_file_ids

Review comment:
       This seems redundant. When the operator completes there will be 2 `XComs` with the same information but different keys: "return_value" and "remote_file_ids". This feels more straightforward but no strong opinion really on which one is used.
   ```suggestion
           return remote_file_ids
   ```

##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")

Review comment:
       Should this operator fail if _all_ of the files are not found? Right now the task would succeed and might give users the false sense that files were uploaded as expected. WDYT?

##########
File path: tests/providers/google/suite/operators/test_drive.py
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pathlib import Path
+from unittest import mock
+
+from airflow.providers.google.suite.operators.drive import GoogleDriveUploadOperator
+
+GCP_CONN_ID = "test"
+DRIVE_FOLDER = Path("test_folder")
+LOCAL_PATHS = [Path("test1"), Path("test2")]
+REMOTE_FILE_IDS = ["rtest1", "rtest2"]
+
+
+class TestGoogleDriveUpload:
+    @mock.patch("airflow.providers.google.suite.operators.drive.GoogleDriveHook")
+    @mock.patch("airflow.providers.google.suite.operators.drive.GoogleDriveUploadOperator.xcom_push")
+    def test_execute(self, mock_xcom, mock_hook):
+        context = {}
+        mock_hook.return_value.upload_file.return_value = REMOTE_FILE_IDS
+        op = GoogleDriveUploadOperator(
+            task_id="test_task", local_paths=LOCAL_PATHS, drive_folder=DRIVE_FOLDER, gcp_conn_id=GCP_CONN_ID
+        )
+        op.execute(context)
+
+        calls = [
+            mock.call(
+                local_location="test1",
+                remote_location="test_folder/test1",
+                chunk_size=100 * 1024 * 1024,
+                resumable=False,
+            ),
+            mock.call(
+                local_location="test2",
+                remote_location="test_folder/test2",
+                chunk_size=100 * 1024 * 1024,
+                resumable=False,
+            ),
+        ]
+        mock_hook.return_value.upload_file.assert_has_calls(calls)
+
+        xcom_calls = [
+            mock.call(context, "remote_file_ids", REMOTE_FILE_IDS),
+        ]
+        mock_xcom.has_calls(xcom_calls)

Review comment:
       ```suggestion
           mock_xcom.assert_has_calls(xcom_calls)
   ```
   Should use `assert_has_calls()` instead of `has_calls()` I believe based on #20453.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1073234215


   CI indeed help us :) 
   example dag is missing for this operator
   https://github.com/apache/airflow/runs/5617058321?check_suite_focus=true#step:8:892
   ```
             with self.subTest("Detect missing example dags"):
                 missing_example = {s for s in operator_sets if not has_example_dag(s)}
                 missing_example -= self.MISSING_EXAMPLE_DAGS
     >           assert set() == missing_example
     E           AssertionError: assert set() == {('suite', 'local_to_drive')}
     E             Extra items in the right set:
     E             ('suite', 'local_to_drive')
     E             Use -v to get the full diff
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068451285


   How about changing `GoogleDriveToLocalOperator` to `GoogleDriveDownloadOperator` and make it a regular operator, rather than transfer operator? Because neither of them uses more than one service, so there is no transfer between two services, and there may be operators like `GoogleDriveMoveFileOperator` in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825461523



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param api_version: version of the Google Drive API
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = ('local_paths', 'drive_folder',)
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        api_version: str = 'v3',

Review comment:
       Must we pin the version?
   
   The google provider is full of pinning versions
   https://github.com/apache/airflow/blob/a840561e1f00ce026a813039e430a085d2cfe35e/airflow/providers/google/cloud/operators/datafusion.py#L166
   https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/google/marketing_platform/operators/display_video.py#L76
   
   
   This means that we need to remember to bump it and make it as a breaking change.
   Does Google offer default version that comes within its own lib?
   like facebook has (for example)
   https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py#L65-L66
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825468164



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param api_version: version of the Google Drive API
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = ('local_paths', 'drive_folder',)
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        api_version: str = 'v3',

Review comment:
       i dont think removing it is the way to go.
   if there are multipule versions of the API users might want to set what they wish.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r826289304



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Sequence[Path],

Review comment:
       I would also allow `Sequence[str]` to suport templating this field. I don't think jinja will resolve paths




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067909036


   @potiuk any ideas how to solve the current failure? It's working fine, but mypy doesn't like type casting, I guess?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067266920


   Added it so we might add it to the new release of providers :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825462049



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param api_version: version of the Google Drive API
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = ('local_paths', 'drive_folder',)
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        api_version: str = 'v3',

Review comment:
       actually, we can get rid of it altogether, because it's passed to the hook and the hook already has the version default. But, 

##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param api_version: version of the Google Drive API
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = ('local_paths', 'drive_folder',)
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        api_version: str = 'v3',

Review comment:
       actually, we can get rid of it altogether, because it's passed to the hook and the hook already has the version default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825502086



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825490158



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,

Review comment:
       Please add type hint




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825500406



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r827272477



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")

Review comment:
       I think we gave the users all the necessary info with the return value. However, we may also discuss if we really want the operator to fail in any case. It may be useful if we give an error if no file_id has returned. But it may also have drawbacks, like having to include an `optional` parameter and such.
   
   Also, in any case, checking for the availability of the local files shouldn't be in this operator's scope, and logging them is enough imo.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r827386301



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")

Review comment:
       I think adding a parameter related to the failure behavior here would be a good idea. If this operator succeeds but all files (or maybe even if one file?) are missing and therefore not uploaded, this is a false-positive which I think will be confusing to users. With the parameter you'd give the user an explicit option to handle this scenario. I also don't necessarily think users are always going to use the output from operators or know to look at the `XComs` pushed by the task to determine what's going on.
   
   There is an `ignore_if_missing` parameter in some places which could be a nice analog here. WDYT?
   
   https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py#L52
   
   https://github.com/apache/airflow/blob/c1ab8e2d7b68a31408e750129592e16432474512/airflow/providers/google/cloud/operators/bigquery.py#L1784
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830601510



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.exceptions import AirflowFailException
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param ignore_if_missing: if True, then don't fail even if all files
+        can't be uploaded.
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        ignore_if_missing: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.ignore_if_missing = ignore_if_missing
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info("Uploading file to Google Drive: %s", local_path)
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info("Deleted local file: %s", local_path)
+            except FileNotFoundError:
+                self.log.warning("File %s can't be found", local_path)

Review comment:
       `FileNotFoundError` is subclasses of `OSError`.
   So catching `OSError` is enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068466108


   > and we have a automated way to handle it properly with source/target in the provider yaml
   
   This doesn't make the local filesystem a service, it makes the naming convention wrong.
   
   But if there is a way we already widely use, it's better to stick to it, of course.
   
   Next naming problem is about `GoogleDrive`. Should we call it `GDrive`, `Gdrive` or `GoogleDrive`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068473533


   > This doesn't make the local filesystem a service, it makes the naming convention wrong.
   > But if there is a way we already widely use, it's better to stick to it, of course.
   
   You can always take it to the mailing list pitch your idea/suggestions. This is an open source project so if you feel strongly about something you can make a difference by promoting it and implementing it but until then we stick to the conventions we have :)
   
   > Next naming problem is about `GoogleDrive`. Should we call it `GDrive`, `Gdrive` or `GoogleDrive`?
   
   It should be `GoogleDrive` because the hook is `GoogleDriveHook`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r827413996



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")

Review comment:
       so the next question is: when will it fail?
   
   When there is no file_ids returned,
   or
   When the returned list's length is less than the local_files' length
   
   I think, as you said it may be misleading, the latter one makes more sense. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067190434


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830571273



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")

Review comment:
       We should avoid f-strings in logging.
   see https://blog.pilosus.org/posts/2020/01/24/python-f-strings-in-logging/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825501469



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Any) -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+            file_name = local_path.split('/')[-1]
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=local_path,
+                    remote_location=os.path.join(self.drive_folder, file_name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.error(f"File {local_path} can't be found")

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068480017


   > I think local_to_drive is the way to go, right?
   
   `local_to_drive` is fine


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Bowrna commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
Bowrna commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067867391


   static checks are failing in mypy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r826378437



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Sequence[Path],

Review comment:
       the same applies to drive_folder as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830595049



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.exceptions import AirflowFailException
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param ignore_if_missing: if True, then don't fail even if all files
+        can't be uploaded.
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        ignore_if_missing: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.ignore_if_missing = ignore_if_missing
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info("Uploading file to Google Drive: %s", local_path)
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info("Deleted local file: %s", local_path)
+            except FileNotFoundError:
+                self.log.warning("File %s can't be found", local_path)

Review comment:
       `FileNotFoundError` seemed like the main error cause in my case, but it'd be better to catch all `OSError`s. How about adding `OSError` after `FileNotFoundError`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830601617



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.exceptions import AirflowFailException
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param ignore_if_missing: if True, then don't fail even if all files
+        can't be uploaded.
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        ignore_if_missing: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.ignore_if_missing = ignore_if_missing
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info("Uploading file to Google Drive: %s", local_path)
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info("Deleted local file: %s", local_path)
+            except FileNotFoundError:
+                self.log.warning("File %s can't be found", local_path)

Review comment:
       https://docs.python.org/3/library/exceptions.html#os-exceptions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Add `LocalFilesystemToGoogleDriveOperator`

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r831189822



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.exceptions import AirflowFailException
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param ignore_if_missing: if True, then don't fail even if all files
+        can't be uploaded.
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        ignore_if_missing: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.ignore_if_missing = ignore_if_missing
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info("Uploading file to Google Drive: %s", local_path)
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info("Deleted local file: %s", local_path)
+            except FileNotFoundError:
+                self.log.warning("File can't be found: %s", local_path)
+            except OSError:
+                self.log.warning("An OSError occurred for file: %s", local_path)

Review comment:
       Raise would fail the dag too early, however the intention was failing after seeing the whole landscape. That's why we logged the problems and raised as a result, if user doesn't want to ignore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825503425



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Any) -> Sequence[str]:

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825468164



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param api_version: version of the Google Drive API
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = ('local_paths', 'drive_folder',)
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        api_version: str = 'v3',

Review comment:
       i don't think removing it is the way to go.
   if there are multiple versions of the API users might want to set what they wish.
   My question was only about the default value of the parameter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068445477


   @turbaszek @eladkal @potiuk Should this technically be a transfer operator, organized under something like `airflow/providers/google/suite/transfers/local_to_gdrive.py`, and maybe called `LocalToGoogleDriveOperator`? This is effectively the opposite of the existing [`GoogleDriveToLocalOperator`](https://github.com/apache/airflow/blob/08575ddd8a72f96a3439f73e973ee9958188eb83/airflow/providers/google/cloud/transfers/gdrive_to_local.py). AIP-21 doesn't _specifically_ cover new modules but maybe since the naming/organization pattern exists we follow it for consistency? WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r826289304



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Sequence[Path],

Review comment:
       I would also allow `Sequence[str]` to suport templating this field. I don't think jinja will resolve paths. 
   ```suggestion
           local_paths: Union[Sequence[Path], Sequence[str]],
   ```
   And in constructor we need to map in case of strings




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal edited a comment on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal edited a comment on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068461138


   > so there is no transfer between two services
   
   There is... and we have a automated way to handle it properly with source/target in the provider yaml. Example:
   https://github.com/apache/airflow/blob/16adc035b1ecdf533f44fbb3e32bea972127bb71/airflow/providers/microsoft/azure/provider.yaml#L169
   
   The operator name should be `LocalFilesystemToGoogleDrive` (unless there is some other convention for `GoogleDrive` which I'm not aware of?)
   The `LocalFilesystemTo*`  is the convention:
   ```
   LocalFilesystemToGCSOperator
   LocalFilesystemToS3Operator
   LocalFilesystemToADLSOperator
   LocalFilesystemToWasbOperator
   ```
   The file should be called `local_to_drive.py` and be placed in the transfers folder
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal merged pull request #22219: Add `LocalFilesystemToGoogleDriveOperator`

Posted by GitBox <gi...@apache.org>.
eladkal merged pull request #22219:
URL: https://github.com/apache/airflow/pull/22219


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825490457



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Any) -> Sequence[str]:

Review comment:
       ```suggestion
       def execute(self, context: "Context") -> Sequence[str]:
   ```
   
   You will need:
   ```
   from typing import TYPE_CHECKING
   
   if TYPE_CHECKING:
       from airflow.utils.context import Context
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r825502032



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from typing import Any, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file path strings
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths,
+        drive_folder: str,
+        gcp_conn_id: str,
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Any) -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+            file_name = local_path.split('/')[-1]

Review comment:
       How about changing both local_paths and drive_folder to use `Path`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067309919


   How about this? @potiuk @turbaszek 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc removed a comment on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc removed a comment on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1067909036


   @potiuk any ideas how to solve the current failure? It's working fine, but mypy doesn't like type casting, I guess?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ulsc commented on pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
ulsc commented on pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#issuecomment-1068477814


   > until then we stick to the conventions we have :)
   
   It still doesn't make the local filesystem a service though :) The argument you're backing and the argument we're talking about are different.
   
   Anyways, how about the file's name?
   
   Should it be `local_to_drive`, `local_to_google_drive`, `local_to_gdrive` or `local_to_googledrive`?
   
   I think `local_to_drive` is the way to go, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Add `LocalFilesystemToGoogleDriveOperator`

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r831180470



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.exceptions import AirflowFailException
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param ignore_if_missing: if True, then don't fail even if all files
+        can't be uploaded.
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        ignore_if_missing: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.ignore_if_missing = ignore_if_missing
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info("Uploading file to Google Drive: %s", local_path)
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info("Deleted local file: %s", local_path)
+            except FileNotFoundError:
+                self.log.warning("File can't be found: %s", local_path)
+            except OSError:
+                self.log.warning("An OSError occurred for file: %s", local_path)

Review comment:
       merged this one too fast :) 
   We should do something like:
   
   ```
           except FileNotFoundError as e:
               self.log.info('File not found: %s: %s', local_path, e)
               raise
           except OSError as e:
               self.log.info('An OSError occurred for file: %s: %s', local_path, e)
               raise
   ```
   
   or
   
   ```
   except OSError as e:
       raise AirflowException(
           f'Exception: There was exception related to file in path {local_path}  '
           f'full error is ({e}).'
       )
   ```
   Would you mind open a follow up PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830601510



##########
File path: airflow/providers/google/suite/transfers/local_to_drive.py
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+
+from airflow.exceptions import AirflowFailException
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LocalFilesystemToGoogleDriveOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:LocalFilesystemToGoogleDriveOperator`
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param ignore_if_missing: if True, then don't fail even if all files
+        can't be uploaded.
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        ignore_if_missing: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.ignore_if_missing = ignore_if_missing
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info("Uploading file to Google Drive: %s", local_path)
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info("Deleted local file: %s", local_path)
+            except FileNotFoundError:
+                self.log.warning("File %s can't be found", local_path)

Review comment:
       `FileNotFoundError` inherits from `OSError`.
   So catching `OSError` is enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #22219: Gdrive upload operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #22219:
URL: https://github.com/apache/airflow/pull/22219#discussion_r830111581



##########
File path: airflow/providers/google/suite/operators/drive.py
##########
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This file contains Google Drive operators"""
+
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GoogleDriveUploadOperator(BaseOperator):
+    """
+    Upload a list of files to a Google Drive folder.
+    This operator uploads a list of local files to a Google Drive folder.
+    The local files can be deleted after upload (optional)
+
+    :param local_paths: Python list of local file paths
+    :param drive_folder: path of the Drive folder
+    :param gcp_conn_id: Airflow Connection ID for GCP
+    :param delete: should the local files be deleted after upload?
+    :param chunk_size: File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True. Pass in a value of -1 if the file is to be
+        uploaded as a single chunk. Note that Google App Engine has a 5MB limit
+        on request size, so you should never set your chunk size larger than 5MB,
+        or to -1.
+    :param resumable: True if this is a resumable upload. False means upload
+        in a single request.
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account
+    :return: Remote file ids after upload
+    :rtype: Sequence[str]
+    """
+
+    template_fields = (
+        'local_paths',
+        'drive_folder',
+    )
+
+    def __init__(
+        self,
+        local_paths: Union[Sequence[Path], Sequence[str]],
+        drive_folder: Union[Path, str],
+        gcp_conn_id: str = "google_cloud_default",
+        delete: bool = False,
+        chunk_size: int = 100 * 1024 * 1024,
+        resumable: bool = False,
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.local_paths = local_paths
+        self.drive_folder = drive_folder
+        self.gcp_conn_id = gcp_conn_id
+        self.delete = delete
+        self.chunk_size = chunk_size
+        self.resumable = resumable
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> Sequence[str]:
+        hook = GoogleDriveHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        remote_file_ids = []
+
+        for local_path in self.local_paths:
+            self.log.info(f'Uploading file to Google Drive: {local_path}')
+
+            try:
+                remote_file_id = hook.upload_file(
+                    local_location=str(local_path),
+                    remote_location=str(Path(self.drive_folder) / Path(local_path).name),
+                    chunk_size=self.chunk_size,
+                    resumable=self.resumable,
+                )
+
+                remote_file_ids.append(remote_file_id)
+
+                if self.delete:
+                    os.remove(local_path)
+                    self.log.info(f'Deleted local file: {local_path}')
+            except FileNotFoundError:
+                self.log.warning(f"File {local_path} can't be found")

Review comment:
       I agree. If _any_ of the files are missing it makes sense to fail the task but configurable to allow files to be missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org