You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "moiseenkov (via GitHub)" <gi...@apache.org> on 2023/02/10 10:54:51 UTC

[GitHub] [airflow] moiseenkov opened a new pull request, #29462: Add deferrable mode for S3ToGCSOperator

moiseenkov opened a new pull request, #29462:
URL: https://github.com/apache/airflow/pull/29462

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116198006


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -104,6 +127,7 @@ class S3ToGCSOperator(S3ListOperator):
         "google_impersonation_chain",
     )
     ui_color = "#e09411"
+    transfer_job_max_files_number = 1000

Review Comment:
   Why hard-code? should we make it configurable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Lee-W commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1621296575

   @potiuk Sure! Sorry for the late reply. I'm already working on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1566741611

   Hi team,
   
   I rebased this branch, updated dependencies and addressed all comments. Could you please re-review it?
   @kaxil , @pankajastro 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1104505128


##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(
+        self, job_names: list[str], project_id: str | None = None, polling_interval_seconds: int = 10

Review Comment:
   Thanks for the advice. The name is updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117214258


##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+
+TEST_PROJECT_ID = "project-id"
+
+
+@pytest.fixture
+def hook_async():
+    with mock.patch(
+        "airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook.__init__",
+        new=mock_base_gcp_hook_default_project_id,
+    ):
+        yield CloudDataTransferServiceAsyncHook()
+
+
+if sys.version_info < (3, 8):
+
+    class AsyncMock(mock.MagicMock):
+        async def __call__(self, *args, **kwargs):
+            return super(AsyncMock, self).__call__(*args, **kwargs)
+
+else:
+    from unittest.mock import AsyncMock
+
+
+class TestCloudDataTransferServiceAsyncHook:
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_storage_transfer_service"

Review Comment:
   Agree, thanks for noticing, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1575768402

   Rebased it after some flaky mssql tools addressing in main, let's see if it helps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1104379868


##########
generated/provider_dependencies.json:
##########
@@ -359,6 +359,7 @@
       "google-cloud-secret-manager>=0.2.0,<2.0.0",
       "google-cloud-spanner>=1.10.0,<2.0.0",
       "google-cloud-speech>=0.36.3,<2.0.0",
+      "google-cloud-storage-transfer<=1.4.0",

Review Comment:
   Shouldn't pin unless major version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1104510922


##########
generated/provider_dependencies.json:
##########
@@ -359,6 +359,7 @@
       "google-cloud-secret-manager>=0.2.0,<2.0.0",
       "google-cloud-spanner>=1.10.0,<2.0.0",
       "google-cloud-speech>=0.36.3,<2.0.0",
+      "google-cloud-storage-transfer<=1.4.0",

Review Comment:
   Unfortunately, later versions of `google-cloud-storage-transfer` lead dependencies conflict. It can be upgraded together with other google packages. I left a comment about it in the `airflow/providers/google/provider.yaml`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116734826


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -104,6 +127,7 @@ class S3ToGCSOperator(S3ListOperator):
         "google_impersonation_chain",
     )
     ui_color = "#e09411"
+    transfer_job_max_files_number = 1000

Review Comment:
   Thank you for noticing. It's a good point, but I think it's better to leave it hard-coded, because:
   1. This attribute exist only because of Google Storage Transfer Service API limitation for the field `includePrefixes[]` (docs: https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions). We can't increase it, because it might cause "400 Bad request" responses from the API, and decreasing it is also not a good idea, because it might increase the number of API calls. That's why I don't see a use case for changing it, unless the API updates.
   2. Moving this attribute into the `S3ToGCSOperator`'s parameters list also doesn't look good to me, because it is a part of a low level logic that is out of the operator's scope. This new parameter won't influence the operator's result and thus will be quite useless for users. Moreover, after creating a new parameter we will have to maintain it, and it won't be easy to remove it in the future.
   
   That's why from my perspective exposing it to the users won't bring any value. However, it is always possible to monkey patch it if needed. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1621318954

   > We had a recent change on how we treat default deferrable setting and we need to adjust this one too: #31712
   > 
   > One more case in point @Lee-W that we need to add a pre-commit checking the defferrable convention, otherwise we will end up will all kind of mess where some deferrable parameters will have the default values following the convention and some not.
   > 
   > @VladaZakharova - can you please update the change to follow this patterm? @Lee-W -> will it be possible that you add such pre-commit ([#32355 (comment)](https://github.com/apache/airflow/pull/32355#issuecomment-1621100211)) ?
   
   Thanks for the update! I will update PR and ping you afterwards


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117162070


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   Speaking about consistency, agree with you, thanks for noticing - I fixed that.
   Regarding other questions I'm not sure that I understand what exactly you mean. Could you please describe it in more detailed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117157372


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   Agree, thanks. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1104381382


##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(
+        self, job_names: list[str], project_id: str | None = None, polling_interval_seconds: int = 10

Review Comment:
   `polling_interval_seconds` -- should we sync with others and have `poll_interval` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #29462:
URL: https://github.com/apache/airflow/pull/29462


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1560279560

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116686673


##########
generated/provider_dependencies.json:
##########
@@ -359,6 +359,7 @@
       "google-cloud-secret-manager>=0.2.0,<2.0.0",
       "google-cloud-spanner>=1.10.0,<2.0.0",
       "google-cloud-speech>=0.36.3,<2.0.0",
+      "google-cloud-storage-transfer<=1.4.0",

Review Comment:
   Unfortunately, `google-cloud-storage-transfer` of version 1.4.1+ requires higher versions of `protobuf` and `proto-plus` libraries that conflict with other Google packages. However, all these packages should be upgraded soon and `google-cloud-storage-transfer` will be among them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117191833


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,
+                        "secret_access_key": config.aws_secret_access_key,
+                    },
+                },
+                OBJECT_CONDITIONS: {
+                    "include_prefixes": [],
+                },
+                GCS_DATA_SINK: {BUCKET_NAME: gcs_bucket, PATH: gcs_prefix},
+                TRANSFER_OPTIONS: {
+                    "overwrite_objects_already_existing_in_sink": self.replace,
+                },
+            },
+        }
+
+        # max size of the field 'transfer_job.transfer_spec.object_conditions.include_prefixes' is 1000,
+        # that's why we submit multiple jobs transferring 1000 files each.
+        # See documentation below
+        # https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
+        chunk_size = self.transfer_job_max_files_number
+        job_names = []
+        transfer_hook = self.get_transfer_hook()
+        for i in range(0, len(files), chunk_size):
+            files_chunk = files[i : i + chunk_size]
+            body[TRANSFER_SPEC][OBJECT_CONDITIONS]["include_prefixes"] = files_chunk
+            job = transfer_hook.create_transfer_job(body=body)

Review Comment:
   As I see it, it's better to not perform cleaning up or undo operations if the job fails, because current implementation is transparent for users - they are able to see, where the transfer got interrupted, what files were copied and what files still waiting, they can find the failed job and examine it get a better understanding of what happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116199099


##########
docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst:
##########
@@ -37,6 +37,16 @@ to transfer data from Amazon S3 to Google Cloud Storage.
     :start-after: [START howto_transfer_s3togcs_operator]
     :end-before: [END howto_transfer_s3togcs_operator]
 
+There is a possibility to start S3ToGCSOperator asynchronously using deferrable mode. To do so just add parameter
+``deferrable=True`` into the operator call. Under the hood it will delegate data transfer to Google Cloud Storage
+Transfer Service. By changing parameter ``polling_interval_seconds=10`` you can control frequency of polling a transfer

Review Comment:
   ```suggestion
   Transfer Service. By changing parameter ``polling_interval=10`` you can control frequency of polling a transfer
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1161136074


##########
generated/provider_dependencies.json:
##########
@@ -359,6 +359,7 @@
       "google-cloud-secret-manager>=0.2.0,<2.0.0",
       "google-cloud-spanner>=1.10.0,<2.0.0",
       "google-cloud-speech>=0.36.3,<2.0.0",
+      "google-cloud-storage-transfer<=1.4.0",

Review Comment:
   We should add a comment about it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #29462: Deferrable mode for S3ToGCSOperator
URL: https://github.com/apache/airflow/pull/29462


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1243635692


##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id.
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):

Review Comment:
   poll_interval missing in docs string



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id.
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            "airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."

Review Comment:
   this string looks long maybe we should do 
   `f"{self.__class__.__module__ } .{self.__class__.__qualname__}"`



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+    :param job_names: List of transfer jobs names

Review Comment:
   ```suggestion
   
       :param job_names: List of transfer jobs names
   ```
   This applies at other places also. please keep an empty line between description and param it looks better in UI.



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id.
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            "airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."
+            "CloudStorageTransferServiceCreateJobsTrigger",
+            {
+                "project_id": self.project_id,
+                "job_names": self.job_names,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
+        """Gets current data storage transfer jobs and yields a TriggerEvent."""
+        async_hook: CloudDataTransferServiceAsyncHook = self.get_async_hook()
+
+        while True:
+            self.log.info("Attempting to request jobs statuses")
+            jobs_completed_successfully = 0
+            try:
+                jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
+                jobs, awaitable_operations = [], []
+                async for job in jobs_pager:
+                    operation = async_hook.get_latest_operation(job)
+                    jobs.append(job)
+                    awaitable_operations.append(operation)
+
+                operations: list[TransferOperation] = await asyncio.gather(*awaitable_operations)
+
+                for job, operation in zip(jobs, operations):
+                    if operation is None:
+                        yield TriggerEvent(
+                            {
+                                "status": "error",
+                                "message": f"Transfer job {job.name} has no latest operation.",
+                            }
+                        )
+                        return
+                    elif operation.status == TransferOperation.Status.SUCCESS:
+                        jobs_completed_successfully += 1
+                    elif operation.status in (
+                        TransferOperation.Status.FAILED,
+                        TransferOperation.Status.ABORTED,
+                    ):
+                        yield TriggerEvent(
+                            {
+                                "status": "error",
+                                "message": f"Transfer operation {operation.name} failed with status "
+                                f"{TransferOperation.Status(operation.status).name}",
+                            }
+                        )
+                        return
+            except (GoogleAPIError, AirflowException) as ex:
+                yield TriggerEvent(dict(status="error", message=str(ex)))
+                return
+
+            jobs_total = len(self.job_names)
+            self.log.info("Transfer jobs completed: %s of %s", jobs_completed_successfully, jobs_total)
+            if jobs_completed_successfully == jobs_total:

Review Comment:
   Just want to point out that the value of jobs_total and jobs_completed_successfully can be invalidated if a trigger die and another trigger picks the job



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1621421142

   Hi, @potiuk , @Lee-W ,
   
   I updated the PR: 
   https://github.com/apache/airflow/pull/29462/files#diff-3e4eedf131305f589cec193af2b406b57a5eab1a3db181272996c7e4d03e6cc6R156


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov closed pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov closed pull request #29462: Add deferrable mode for S3ToGCSOperator
URL: https://github.com/apache/airflow/pull/29462


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1441758782

   @kaxil , hi,
   Could you review my fixes please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117156672


##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+
+TEST_PROJECT_ID = "project-id"
+
+
+@pytest.fixture
+def hook_async():
+    with mock.patch(
+        "airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook.__init__",
+        new=mock_base_gcp_hook_default_project_id,
+    ):
+        yield CloudDataTransferServiceAsyncHook()
+
+
+if sys.version_info < (3, 8):

Review Comment:
   Thanks for noticing. Done.



##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117156191


##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            "airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."
+            "CloudStorageTransferServiceCreateJobsTrigger",
+            {
+                "project_id": self.project_id,
+                "job_names": self.job_names,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: ignore[override]
+        """Gets current data storage transfer jobs and yields a TriggerEvent"""
+        async_hook = self.get_async_hook()
+
+        while True:
+            self.log.info("Attempting to request jobs statuses")

Review Comment:
   Well, there is such a log message that says how many jobs are completed just a few lines below after the try..except block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116213336


##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -490,3 +494,54 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):
+        """
+        Returns async connection to the Storage Transfer Service
+
+        :return: Google Storage Transfer asynchronous client.
+        """
+        if not self._client:
+            try:
+                self._client = storage_transfer_v1.StorageTransferServiceAsyncClient()
+            except GoogleAuthError as ex:
+                raise AirflowException(ex)
+        return self._client
+
+    def get_jobs(self, job_names: list[str]):

Review Comment:
   can we make this async fun 



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""

Review Comment:
   ```suggestion
           """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
           
   ```



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            "airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."
+            "CloudStorageTransferServiceCreateJobsTrigger",
+            {
+                "project_id": self.project_id,
+                "job_names": self.job_names,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: ignore[override]
+        """Gets current data storage transfer jobs and yields a TriggerEvent"""
+        async_hook = self.get_async_hook()
+
+        while True:
+            self.log.info("Attempting to request jobs statuses")

Review Comment:
   Adding more meaningful log messages here will help, something like maybe how many job done 



##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+
+TEST_PROJECT_ID = "project-id"
+
+
+@pytest.fixture
+def hook_async():
+    with mock.patch(
+        "airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook.__init__",
+        new=mock_base_gcp_hook_default_project_id,
+    ):
+        yield CloudDataTransferServiceAsyncHook()
+
+
+if sys.version_info < (3, 8):

Review Comment:
   Let's reuse it from https://github.com/apache/airflow/blob/main/tests/providers/google/cloud/utils/compat.py



##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):

Review Comment:
   reuse it from https://github.com/apache/airflow/blob/main/tests/providers/google/cloud/utils/compat.py



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   keep these keys consistent i.e either use import constant for both key or hardcoded one like "secret_access_key"



##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+
+TEST_PROJECT_ID = "project-id"
+
+
+@pytest.fixture
+def hook_async():
+    with mock.patch(
+        "airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook.__init__",
+        new=mock_base_gcp_hook_default_project_id,
+    ):
+        yield CloudDataTransferServiceAsyncHook()
+
+
+if sys.version_info < (3, 8):
+
+    class AsyncMock(mock.MagicMock):
+        async def __call__(self, *args, **kwargs):
+            return super(AsyncMock, self).__call__(*args, **kwargs)
+
+else:
+    from unittest.mock import AsyncMock
+
+
+class TestCloudDataTransferServiceAsyncHook:
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_storage_transfer_service"

Review Comment:
   This patch path is really long maybe you can keep a constant
   ```
   base_path = airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
   
   @mock.patch(f"{base_path}.storage_transfer_v1.StorageTransferServiceAsyncClient")
   def test_(...):
      ...
   ```



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   what if roleArn is givien ?



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   shall we path param also `path`, WDYT?



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:

Review Comment:
   shall we check this in transfer operator init only that way it would fail early 



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,
+                        "secret_access_key": config.aws_secret_access_key,
+                    },
+                },
+                OBJECT_CONDITIONS: {
+                    "include_prefixes": [],
+                },
+                GCS_DATA_SINK: {BUCKET_NAME: gcs_bucket, PATH: gcs_prefix},
+                TRANSFER_OPTIONS: {
+                    "overwrite_objects_already_existing_in_sink": self.replace,
+                },
+            },
+        }
+
+        # max size of the field 'transfer_job.transfer_spec.object_conditions.include_prefixes' is 1000,
+        # that's why we submit multiple jobs transferring 1000 files each.
+        # See documentation below
+        # https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
+        chunk_size = self.transfer_job_max_files_number
+        job_names = []
+        transfer_hook = self.get_transfer_hook()
+        for i in range(0, len(files), chunk_size):
+            files_chunk = files[i : i + chunk_size]
+            body[TRANSFER_SPEC][OBJECT_CONDITIONS]["include_prefixes"] = files_chunk
+            job = transfer_hook.create_transfer_job(body=body)

Review Comment:
   If a job fails in the middle, do we need to cleanup? 



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            "airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."
+            "CloudStorageTransferServiceCreateJobsTrigger",
+            {
+                "project_id": self.project_id,
+                "job_names": self.job_names,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: ignore[override]
+        """Gets current data storage transfer jobs and yields a TriggerEvent"""
+        async_hook = self.get_async_hook()
+
+        while True:
+            self.log.info("Attempting to request jobs statuses")
+            jobs_completed_successful = 0
+            try:
+                jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
+                jobs, awaitable_operations = [], []
+                async for job in jobs_pager:
+                    operation = async_hook.get_latest_operation(job)
+                    jobs.append(job)
+                    awaitable_operations.append(operation)
+
+                operations: list[TransferOperation] = await asyncio.gather(*awaitable_operations)
+
+                for job, operation in zip(jobs, operations):
+                    if operation is None:
+                        yield TriggerEvent(

Review Comment:
   once it fails what will happen to job that you have created in `create_transfer_job`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117162070


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   Speaking about consistency, agree with you, thanks for noticing - I fixed that.
   Regarding other questions I'm not sure that I understand what exactly you mean. Could you please describe it in more detail?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117213518


##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            "airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."
+            "CloudStorageTransferServiceCreateJobsTrigger",
+            {
+                "project_id": self.project_id,
+                "job_names": self.job_names,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: ignore[override]
+        """Gets current data storage transfer jobs and yields a TriggerEvent"""
+        async_hook = self.get_async_hook()
+
+        while True:
+            self.log.info("Attempting to request jobs statuses")
+            jobs_completed_successful = 0
+            try:
+                jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
+                jobs, awaitable_operations = [], []
+                async for job in jobs_pager:
+                    operation = async_hook.get_latest_operation(job)
+                    jobs.append(job)
+                    awaitable_operations.append(operation)
+
+                operations: list[TransferOperation] = await asyncio.gather(*awaitable_operations)
+
+                for job, operation in zip(jobs, operations):
+                    if operation is None:
+                        yield TriggerEvent(

Review Comment:
   If one of the jobs fails then the trigger yields TriggerEvent with information about this failure. After this event handled the DAG run marks failed. Each created transfer job remains in the Google Cloud Storage Service. All jobs work independently thus failed jobs won't interfere with healthy jobs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Lee-W commented on a diff in pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1254101538


##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):

Review Comment:
   nitpick
   ```suggestion
       def __init__(self, project_id: str | None = None, **kwargs: Any) -> None:
   ```



##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: storage_transfer_v1.StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):

Review Comment:
   nitpick: could we please add a return type annotation here
   



##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: storage_transfer_v1.StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):
+        """
+        Returns async connection to the Storage Transfer Service.
+
+        :return: Google Storage Transfer asynchronous client.
+        """
+        if not self._client:
+            try:
+                self._client = storage_transfer_v1.StorageTransferServiceAsyncClient()
+            except GoogleAuthError as ex:

Review Comment:
   Do we need to catch the exception here? Should we handle the exception in where this is called? It might be a bit confusing when I call this function and get an `AirflowException` instead of connection related one



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud Storage Transfer job.
+
+    :param job_names: List of transfer jobs names.
+    :param project_id: GCP project id.
+    :param poll_interval: Interval in seconds between polls.
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10):

Review Comment:
   nitpick
   
   ```suggestion
       def __init__(self, job_names: list[str], project_id: str | None = None, poll_interval: int = 10) -> None:
   ```



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -221,3 +246,104 @@ def gcs_to_s3_object(self, gcs_object: str) -> str:
         if self.apply_gcs_prefix:
             return self.prefix + s3_object
         return s3_object
+
+    def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):

Review Comment:
   nitpick
   ```suggestion
       def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> None:
   ```



##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: storage_transfer_v1.StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):
+        """
+        Returns async connection to the Storage Transfer Service.
+
+        :return: Google Storage Transfer asynchronous client.
+        """
+        if not self._client:
+            try:
+                self._client = storage_transfer_v1.StorageTransferServiceAsyncClient()
+            except GoogleAuthError as ex:
+                raise AirflowException(ex)
+        return self._client
+
+    async def get_jobs(self, job_names: list[str]):
+        """
+        Gets the latest state of a long-running operations in Google Storage Transfer Service.
+
+        :param job_names: (Required) List of names of the jobs to be fetched.
+        :return: Object that yields Transfer jobs.
+        """
+        client = self.get_conn()
+        jobs_list_request = storage_transfer_v1.ListTransferJobsRequest(
+            filter=json.dumps(dict(project_id=self.project_id, job_names=job_names))
+        )
+        return await client.list_transfer_jobs(request=jobs_list_request)
+
+    async def get_latest_operation(self, job: storage_transfer_v1.TransferJob) -> Message | None:
+        """
+        Gets the latest operation of the given TransferJob instance.
+
+        :param job: Transfer job instance.
+        :return: The latest job operation.
+        """
+        latest_operation_name = job.latest_operation_name
+        if latest_operation_name:
+            client = self.get_conn()
+            response_operation = await client.transport.operations_client.get_operation(latest_operation_name)
+            operation = storage_transfer_v1.TransferOperation.deserialize(response_operation.metadata.value)
+            return operation
+        return None

Review Comment:
   I think we don't need it for returning `None`.
   
   ```suggestion
   ```



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -221,3 +246,104 @@ def gcs_to_s3_object(self, gcs_object: str) -> str:
         if self.apply_gcs_prefix:
             return self.prefix + s3_object
         return s3_object
+
+    def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        if s3_objects:
+            dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+            for obj in s3_objects:
+                # GCS hook builds its own in-memory file, so we have to create
+                # and pass the path
+                file_object = s3_hook.get_key(obj, self.bucket)
+                with NamedTemporaryFile(mode="wb", delete=True) as file:
+                    file_object.download_fileobj(file)
+                    file.flush()
+                    gcs_file = self.s3_to_gcs_object(s3_object=obj)
+                    gcs_hook.upload(dest_gcs_bucket, gcs_file, file.name, gzip=self.gzip)
+
+            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(s3_objects))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS."""
+        if len(files) <= 0:

Review Comment:
   Should we just go with an empty check?
   
   ```suggestion
           if not len(files):
   ```



##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -466,3 +472,53 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: storage_transfer_v1.StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):
+        """
+        Returns async connection to the Storage Transfer Service.
+
+        :return: Google Storage Transfer asynchronous client.
+        """
+        if not self._client:
+            try:
+                self._client = storage_transfer_v1.StorageTransferServiceAsyncClient()
+            except GoogleAuthError as ex:
+                raise AirflowException(ex)
+        return self._client
+
+    async def get_jobs(self, job_names: list[str]):

Review Comment:
   nitpick: could we please add a return type annotation here
   



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -221,3 +246,104 @@ def gcs_to_s3_object(self, gcs_object: str) -> str:
         if self.apply_gcs_prefix:
             return self.prefix + s3_object
         return s3_object
+
+    def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        if s3_objects:
+            dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+            for obj in s3_objects:
+                # GCS hook builds its own in-memory file, so we have to create
+                # and pass the path
+                file_object = s3_hook.get_key(obj, self.bucket)
+                with NamedTemporaryFile(mode="wb", delete=True) as file:
+                    file_object.download_fileobj(file)
+                    file.flush()
+                    gcs_file = self.s3_to_gcs_object(s3_object=obj)
+                    gcs_hook.upload(dest_gcs_bucket, gcs_file, file.name, gzip=self.gzip)
+
+            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(s3_objects))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):

Review Comment:
   nitpick
   
   ```suggestion
       def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> None:
   ```



##########
tests/system/providers/google/cloud/gcs/example_s3_to_gcs_async.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.decorators import task
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_s3_to_gcs"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+GCS_BUCKET_URL = f"gs://{BUCKET_NAME}/"
+UPLOAD_FILE = "/tmp/example-file.txt"
+PREFIX = "TESTS"
+
+
+@task(task_id="upload_file_to_s3")
+def upload_file():
+    """A callable to upload file to AWS bucket"""
+    s3_hook = S3Hook()
+    s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX, bucket_name=BUCKET_NAME)
+
+
+with models.DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "s3"],
+) as dag:
+    create_s3_bucket = S3CreateBucketOperator(
+        task_id="create_s3_bucket", bucket_name=BUCKET_NAME, region_name="us-east-1"
+    )
+
+    create_gcs_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=BUCKET_NAME,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [START howto_transfer_s3togcs_operator_async]

Review Comment:
   ```suggestion
   
       # [START howto_transfer_s3togcs_operator_async]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1621298593

   > @potiuk Sure! Sorry for the late reply. I'm already working on it.
   
   Cool! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1152427855


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   hmm. I was talking about other aws params https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#AwsS3Data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116199803


##########
generated/provider_dependencies.json:
##########
@@ -359,6 +359,7 @@
       "google-cloud-secret-manager>=0.2.0,<2.0.0",
       "google-cloud-spanner>=1.10.0,<2.0.0",
       "google-cloud-speech>=0.36.3,<2.0.0",
+      "google-cloud-storage-transfer<=1.4.0",

Review Comment:
   That will just create more issues in the future though. Could `<2` work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116747490


##########
docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst:
##########
@@ -37,6 +37,16 @@ to transfer data from Amazon S3 to Google Cloud Storage.
     :start-after: [START howto_transfer_s3togcs_operator]
     :end-before: [END howto_transfer_s3togcs_operator]
 
+There is a possibility to start S3ToGCSOperator asynchronously using deferrable mode. To do so just add parameter
+``deferrable=True`` into the operator call. Under the hood it will delegate data transfer to Google Cloud Storage
+Transfer Service. By changing parameter ``polling_interval_seconds=10`` you can control frequency of polling a transfer

Review Comment:
   Thanks for noticing, I updated the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] moiseenkov commented on a diff in pull request #29462: Add deferrable mode for S3ToGCSOperator

Posted by "moiseenkov (via GitHub)" <gi...@apache.org>.
moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117151252


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS"""
+        if self.poll_interval <= 0:

Review Comment:
   Good point! Fixed.



##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -490,3 +494,54 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):
+        """
+        Returns async connection to the Storage Transfer Service
+
+        :return: Google Storage Transfer asynchronous client.
+        """
+        if not self._client:
+            try:
+                self._client = storage_transfer_v1.StorageTransferServiceAsyncClient()
+            except GoogleAuthError as ex:
+                raise AirflowException(ex)
+        return self._client
+
+    def get_jobs(self, job_names: list[str]):

Review Comment:
   Sure, I also updated  unit tests for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1575770675

   cc: @kosteev @bjankie1 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1630535305

   @potiuk 
   Hi there!
   Could we recheck this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ahidalgob commented on a diff in pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "ahidalgob (via GitHub)" <gi...@apache.org>.
ahidalgob commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1243913011


##########
docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst:
##########
@@ -37,6 +37,16 @@ to transfer data from Amazon S3 to Google Cloud Storage.
     :start-after: [START howto_transfer_s3togcs_operator]
     :end-before: [END howto_transfer_s3togcs_operator]
 
+There is a possibility to start S3ToGCSOperator asynchronously using deferrable mode. To do so just add parameter
+``deferrable=True`` into the operator call. Under the hood it will delegate data transfer to Google Cloud Storage
+Transfer Service. By changing parameter ``poll_interval=10`` you can control frequency of polling a transfer
+job status.
+
+.. exampleinclude::/../tests/system/providers/google/cloud/gcs/example_s3_to_gcs_async.py
+    :language: python
+    :start-after: [START howto_transfer_s3togcs_operator]

Review Comment:
   I think we need to use a different tag in here, don't we? `howto_transfer_s3togcs_operator_async` and also update `example_s3_to_gcs_async.py`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29462: Deferrable mode for S3ToGCSOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29462:
URL: https://github.com/apache/airflow/pull/29462#issuecomment-1621209150

   Hi @potiuk @pankajastro !
   Could you please take a look on this PR? Thanks :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org