You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/06 19:35:32 UTC

[GitHub] kaxil closed pull request #4331: [AIRFLOW-3531] Add gcs to gcs transfer operator.

kaxil closed pull request #4331: [AIRFLOW-3531] Add gcs to gcs transfer operator.
URL: https://github.com/apache/airflow/pull/4331
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/gcp_transfer_hook.py b/airflow/contrib/hooks/gcp_transfer_hook.py
index 906dba786f..6966ec3ae2 100644
--- a/airflow/contrib/hooks/gcp_transfer_hook.py
+++ b/airflow/contrib/hooks/gcp_transfer_hook.py
@@ -26,7 +26,7 @@
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 # Time to sleep between active checks of the operation results
-TIME_TO_SLEEP_IN_SECONDS = 1
+TIME_TO_SLEEP_IN_SECONDS = 10
 
 
 # noinspection PyAbstractClass
@@ -56,10 +56,10 @@ def get_conn(self):
                                http=http_authorized, cache_discovery=False)
         return self._conn
 
-    def create_transfer_job(self, project_id, description, schedule, transfer_spec):
+    def create_transfer_job(self, description, schedule, transfer_spec, project_id=None):
         transfer_job = {
             'status': 'ENABLED',
-            'projectId': project_id,
+            'projectId': project_id or self.project_id,
             'description': description,
             'transferSpec': transfer_spec,
             'schedule': schedule or self._schedule_once_now(),
diff --git a/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py b/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py
new file mode 100644
index 0000000000..410d65821d
--- /dev/null
+++ b/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py
@@ -0,0 +1,127 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
+from airflow.utils.decorators import apply_defaults
+
+
+class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator):
+    """
+    Copies objects from a bucket to another using the GCP Storage Transfer
+    Service.
+
+    :param source_bucket: The source Google cloud storage bucket where the
+         object is. (templated)
+    :type source_bucket: str
+    :param destination_bucket: The destination Google cloud storage bucket
+        where the object should be. (templated)
+    :type destination_bucket: str
+    :param project_id: The ID of the Google Cloud Platform Console project that
+        owns the job
+    :type project_id: str
+    :param gcp_conn_id: Optional connection ID to use when connecting to Google Cloud
+        Storage.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param description: Optional transfer service job description
+    :type description: str
+    :param schedule: Optional transfer service schedule; see
+        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
+        If not set, run transfer job once as soon as the operator runs
+    :type schedule: dict
+    :param object_conditions: Optional transfer service object conditions; see
+        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
+    :type object_conditions: dict
+    :param transfer_options: Optional transfer service transfer options; see
+        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions
+    :type transfer_options: dict
+    :param wait: Wait for transfer to finish; defaults to `True`
+    :type wait: bool
+
+    **Example**:
+
+    .. code-block:: python
+
+       gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
+            task_id='gcs_to_gcs_transfer_example',
+            source_bucket='my-source-bucket',
+            destination_bucket='my-destination-bucket',
+            project_id='my-gcp-project',
+            dag=my_dag)
+    """
+
+    template_fields = ('source_bucket', 'destination_bucket', 'description', 'object_conditions')
+    ui_color = '#e09411'
+
+    @apply_defaults
+    def __init__(self,
+                 source_bucket,
+                 destination_bucket,
+                 project_id=None,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 description=None,
+                 schedule=None,
+                 object_conditions=None,
+                 transfer_options=None,
+                 wait=True,
+                 *args,
+                 **kwargs):
+
+        super(GoogleCloudStorageToGoogleCloudStorageTransferOperator, self).__init__(
+            *args,
+            **kwargs)
+        self.source_bucket = source_bucket
+        self.destination_bucket = destination_bucket
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.description = description
+        self.schedule = schedule
+        self.object_conditions = object_conditions or {}
+        self.transfer_options = transfer_options or {}
+        self.wait = wait
+
+    def execute(self, context):
+        transfer_hook = GCPTransferServiceHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to)
+
+        job = transfer_hook.create_transfer_job(
+            project_id=self.project_id,
+            description=self.description,
+            schedule=self.schedule,
+            transfer_spec={
+                'gcsDataSource': {
+                    'bucketName': self.source_bucket,
+                },
+                'gcsDataSink': {
+                    'bucketName': self.destination_bucket,
+                },
+                'objectConditions': self.object_conditions,
+                'transferOptions': self.transfer_options,
+            }
+        )
+
+        if self.wait:
+            transfer_hook.wait_for_transfer_job(job)
diff --git a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py
index b0cf2ae515..c46a9460e7 100644
--- a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py
+++ b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py
@@ -33,7 +33,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
     :param gcs_bucket: The destination Google Cloud Storage bucket
         where you want to store the files. (templated)
     :type gcs_bucket: str
-    :param project_id: The ID of the Google Cloud Platform Console project that
+    :param project_id: Optional ID of the Google Cloud Platform Console project that
         owns the job
     :type project_id: str
     :param aws_conn_id: The source S3 connection
@@ -51,10 +51,10 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
         https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
         If not set, run transfer job once as soon as the operator runs
     :type schedule: dict
-    :param object_conditions: Transfer service object conditions; see
+    :param object_conditions: Optional transfer service object conditions; see
         https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
     :type object_conditions: dict
-    :param transfer_options: Transfer service transfer options; see
+    :param transfer_options: Optional transfer service transfer options; see
         https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
     :type transfer_options: dict
     :param wait: Wait for transfer to finish
@@ -79,7 +79,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
     def __init__(self,
                  s3_bucket,
                  gcs_bucket,
-                 project_id,
+                 project_id=None,
                  aws_conn_id='aws_default',
                  gcp_conn_id='google_cloud_default',
                  delegate_to=None,
diff --git a/docs/code.rst b/docs/code.rst
index e890adffec..3e1322a341 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -181,6 +181,7 @@ Operators
 .. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
 .. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
 .. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
+.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator
 .. autoclass:: airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageToS3Operator
 .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
 .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator
diff --git a/docs/integration.rst b/docs/integration.rst
index f35c8e87ea..74f5552ae5 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -1184,6 +1184,7 @@ Storage Operators
 - :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object.
 - :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery.
 - :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested.
+- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another using Google Transfer service.
 - :ref:`MySqlToGoogleCloudStorageOperator`: Copy data from any MySQL Database to Google cloud storage in JSON format.
 
 .. _FileToGoogleCloudStorageOperator:
@@ -1242,6 +1243,13 @@ GoogleCloudStorageToGoogleCloudStorageOperator
 
 .. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
 
+.. _GoogleCloudStorageToGoogleCloudStorageTransferOperator:
+
+GoogleCloudStorageToGoogleCloudStorageTransferOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator
+
 .. _MySqlToGoogleCloudStorageOperator:
 
 MySqlToGoogleCloudStorageOperator
diff --git a/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py b/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py
new file mode 100644
index 0000000000..8c0cd4ebfc
--- /dev/null
+++ b/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow.contrib.operators.gcs_to_gcs_transfer_operator import \
+    GoogleCloudStorageToGoogleCloudStorageTransferOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+TASK_ID = 'test-gcs-gcs-transfer-operator'
+SOURCE_BUCKET = 'test-source-bucket'
+DESTINATION_BUCKET = 'test-destination-bucket'
+PROJECT_ID = 'test-project'
+DESCRIPTION = 'test-description'
+SCHEDULE = {
+    'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
+    'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
+}
+
+
+class GoogleCloudStorageToGoogleCloudStorageTransferOperatorTest(unittest.TestCase):
+    def test_constructor(self):
+        """Test GoogleCloudStorageToGoogleCloudStorageTransferOperator instance is properly initialized."""
+
+        operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
+            task_id=TASK_ID,
+            source_bucket=SOURCE_BUCKET,
+            destination_bucket=DESTINATION_BUCKET,
+            project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            schedule=SCHEDULE,
+        )
+
+        self.assertEqual(operator.task_id, TASK_ID)
+        self.assertEqual(operator.source_bucket, SOURCE_BUCKET)
+        self.assertEqual(operator.destination_bucket, DESTINATION_BUCKET)
+        self.assertEqual(operator.project_id, PROJECT_ID)
+        self.assertEqual(operator.description, DESCRIPTION)
+        self.assertEqual(operator.schedule, SCHEDULE)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
+    def test_execute(self, mock_transfer_hook):
+        """Test the execute function when the run is successful."""
+
+        operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
+            task_id=TASK_ID,
+            source_bucket=SOURCE_BUCKET,
+            destination_bucket=DESTINATION_BUCKET,
+            project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            schedule=SCHEDULE,
+        )
+
+        operator.execute(None)
+
+        mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
+            project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            schedule=SCHEDULE,
+            transfer_spec={
+                'gcsDataSource': {
+                    'bucketName': SOURCE_BUCKET,
+                },
+                'gcsDataSink': {
+                    'bucketName': DESTINATION_BUCKET,
+                },
+                'objectConditions': {},
+                'transferOptions': {}
+            }
+        )
+
+        mock_transfer_hook.return_value.wait_for_transfer_job.assert_called_once_with(
+            mock_transfer_hook.return_value.create_transfer_job.return_value
+        )
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
+    def test_execute_skip_wait(self, mock_transfer_hook):
+        """Test the execute function when the run is successful."""
+
+        operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
+            task_id=TASK_ID,
+            source_bucket=SOURCE_BUCKET,
+            destination_bucket=DESTINATION_BUCKET,
+            project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            wait=False,
+        )
+
+        operator.execute(None)
+
+        mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
+            project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            schedule=None,
+            transfer_spec={
+                'gcsDataSource': {
+                    'bucketName': SOURCE_BUCKET,
+                },
+                'gcsDataSink': {
+                    'bucketName': DESTINATION_BUCKET,
+                },
+                'objectConditions': {},
+                'transferOptions': {}
+            }
+        )
+
+        assert not mock_transfer_hook.return_value.wait_for_transfer_job.called
diff --git a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
index 2bf51c0707..0825364884 100644
--- a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
+++ b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
@@ -59,6 +59,7 @@ def test_constructor(self):
             gcs_bucket=GCS_BUCKET,
             project_id=PROJECT_ID,
             description=DESCRIPTION,
+            schedule=SCHEDULE,
         )
 
         self.assertEqual(operator.task_id, TASK_ID)
@@ -66,6 +67,7 @@ def test_constructor(self):
         self.assertEqual(operator.gcs_bucket, GCS_BUCKET)
         self.assertEqual(operator.project_id, PROJECT_ID)
         self.assertEqual(operator.description, DESCRIPTION)
+        self.assertEqual(operator.schedule, SCHEDULE)
 
     @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook')
     @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook')


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services