You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/14 12:23:25 UTC

[GitHub] kaxil closed pull request #4279: [AIRFLOW-3444] Explicitly set transfer operator description.

kaxil closed pull request #4279: [AIRFLOW-3444] Explicitly set transfer operator description.
URL: https://github.com/apache/incubator-airflow/pull/4279
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/gcp_transfer_hook.py b/airflow/contrib/hooks/gcp_transfer_hook.py
index 88534a5103..906dba786f 100644
--- a/airflow/contrib/hooks/gcp_transfer_hook.py
+++ b/airflow/contrib/hooks/gcp_transfer_hook.py
@@ -56,34 +56,19 @@ def get_conn(self):
                                http=http_authorized, cache_discovery=False)
         return self._conn
 
-    def create_transfer_job(self, project_id, transfer_spec, **kwargs):
-        conn = self.get_conn()
-        now = datetime.datetime.utcnow()
+    def create_transfer_job(self, project_id, description, schedule, transfer_spec):
         transfer_job = {
             'status': 'ENABLED',
             'projectId': project_id,
+            'description': description,
             'transferSpec': transfer_spec,
-            'schedule': {
-                'scheduleStartDate': {
-                    'day': now.day,
-                    'month': now.month,
-                    'year': now.year,
-                },
-                'scheduleEndDate': {
-                    'day': now.day,
-                    'month': now.month,
-                    'year': now.year,
-                }
-            }
+            'schedule': schedule or self._schedule_once_now(),
         }
-        transfer_job.update(kwargs)
-        result = conn.transferJobs().create(body=transfer_job).execute()
-        self.wait_for_transfer_job(result, conn=conn)
+        return self.get_conn().transferJobs().create(body=transfer_job).execute()
 
-    def wait_for_transfer_job(self, job, conn=None):
-        conn = conn or self.get_conn()
+    def wait_for_transfer_job(self, job):
         while True:
-            result = conn.transferOperations().list(
+            result = self.get_conn().transferOperations().list(
                 name='transferOperations',
                 filter=json.dumps({
                     'project_id': job['projectId'],
@@ -105,3 +90,18 @@ def _check_operations_result(self, result):
             if operation['metadata']['status'] != 'SUCCESS':
                 return False
         return True
+
+    def _schedule_once_now(self):
+        now = datetime.datetime.utcnow()
+        return {
+            'scheduleStartDate': {
+                'day': now.day,
+                'month': now.month,
+                'year': now.year,
+            },
+            'scheduleEndDate': {
+                'day': now.day,
+                'month': now.month,
+                'year': now.year,
+            }
+        }
diff --git a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py
index e2fbf95b73..b0cf2ae515 100644
--- a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py
+++ b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py
@@ -45,15 +45,20 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
         For this to work, the service account making the request must have
         domain-wide delegation enabled.
     :type delegate_to: str
+    :param description: Optional transfer service job description
+    :type description: str
+    :param schedule: Optional transfer service schedule; see
+        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
+        If not set, run transfer job once as soon as the operator runs
+    :type schedule: dict
     :param object_conditions: Transfer service object conditions; see
         https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
     :type object_conditions: dict
     :param transfer_options: Transfer service transfer options; see
         https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
     :type transfer_options: dict
-    :param job_kwargs: Additional transfer job options; see
-        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs
-    :type job_kwargs: dict
+    :param wait: Wait for transfer to finish
+    :type wait: bool
 
     **Example**:
 
@@ -67,7 +72,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
             dag=my_dag)
     """
 
-    template_fields = ('s3_bucket', 'gcs_bucket')
+    template_fields = ('s3_bucket', 'gcs_bucket', 'description', 'object_conditions')
     ui_color = '#e09411'
 
     @apply_defaults
@@ -78,9 +83,11 @@ def __init__(self,
                  aws_conn_id='aws_default',
                  gcp_conn_id='google_cloud_default',
                  delegate_to=None,
+                 description=None,
+                 schedule=None,
                  object_conditions=None,
                  transfer_options=None,
-                 job_kwargs=None,
+                 wait=True,
                  *args,
                  **kwargs):
 
@@ -93,9 +100,11 @@ def __init__(self,
         self.aws_conn_id = aws_conn_id
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
+        self.description = description
+        self.schedule = schedule
         self.object_conditions = object_conditions or {}
         self.transfer_options = transfer_options or {}
-        self.job_kwargs = job_kwargs or {}
+        self.wait = wait
 
     def execute(self, context):
         transfer_hook = GCPTransferServiceHook(
@@ -104,8 +113,10 @@ def execute(self, context):
 
         s3_creds = S3Hook(aws_conn_id=self.aws_conn_id).get_credentials()
 
-        transfer_hook.create_transfer_job(
+        job = transfer_hook.create_transfer_job(
             project_id=self.project_id,
+            description=self.description,
+            schedule=self.schedule,
             transfer_spec={
                 'awsS3DataSource': {
                     'bucketName': self.s3_bucket,
@@ -119,6 +130,8 @@ def execute(self, context):
                 },
                 'objectConditions': self.object_conditions,
                 'transferOptions': self.transfer_options,
-            },
-            **self.job_kwargs
+            }
         )
+
+        if self.wait:
+            transfer_hook.wait_for_transfer_job(job)
diff --git a/tests/contrib/hooks/test_gcp_transfer_hook.py b/tests/contrib/hooks/test_gcp_transfer_hook.py
index a61716653a..1107775f91 100644
--- a/tests/contrib/hooks/test_gcp_transfer_hook.py
+++ b/tests/contrib/hooks/test_gcp_transfer_hook.py
@@ -41,8 +41,7 @@ def setUp(self):
             self.transfer_hook = GCPTransferServiceHook()
             self.transfer_hook._conn = self.conn
 
-    @mock.patch('airflow.contrib.hooks.gcp_transfer_hook.GCPTransferServiceHook.wait_for_transfer_job')
-    def test_create_transfer_job(self, mock_wait):
+    def test_create_transfer_job(self):
         mock_create = self.conn.transferJobs.return_value.create
         mock_execute = mock_create.return_value.execute
         mock_execute.return_value = {
@@ -54,10 +53,12 @@ def test_create_transfer_job(self, mock_wait):
             'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
             'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
         }
-        self.transfer_hook.create_transfer_job('test-project', transfer_spec)
+        self.transfer_hook.create_transfer_job(
+            'test-project', 'test-description', None, transfer_spec)
         mock_create.assert_called_once_with(body={
             'status': 'ENABLED',
             'projectId': 'test-project',
+            'description': 'test-description',
             'transferSpec': transfer_spec,
             'schedule': {
                 'scheduleStartDate': {
@@ -72,7 +73,31 @@ def test_create_transfer_job(self, mock_wait):
                 }
             }
         })
-        mock_wait.assert_called_once_with(mock_execute.return_value, conn=self.conn)
+
+    def test_create_transfer_job_custom_schedule(self):
+        mock_create = self.conn.transferJobs.return_value.create
+        mock_execute = mock_create.return_value.execute
+        mock_execute.return_value = {
+            'projectId': 'test-project',
+            'name': 'transferJobs/test-job',
+        }
+        schedule = {
+            'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
+            'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
+        }
+        transfer_spec = {
+            'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
+            'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
+        }
+        self.transfer_hook.create_transfer_job(
+            'test-project', 'test-description', schedule, transfer_spec)
+        mock_create.assert_called_once_with(body={
+            'status': 'ENABLED',
+            'projectId': 'test-project',
+            'description': 'test-description',
+            'transferSpec': transfer_spec,
+            'schedule': schedule,
+        })
 
     @mock.patch('time.sleep')
     def test_wait_for_transfer_job(self, mock_sleep):
diff --git a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
index cc7bfd7a95..2bf51c0707 100644
--- a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
+++ b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
@@ -36,8 +36,13 @@
 S3_BUCKET = 'test-s3-bucket'
 GCS_BUCKET = 'test-gcs-bucket'
 PROJECT_ID = 'test-project'
+DESCRIPTION = 'test-description'
 ACCESS_KEY = 'test-access-key'
 SECRET_KEY = 'test-secret-key'
+SCHEDULE = {
+    'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
+    'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
+}
 
 
 Credentials = collections.namedtuple(
@@ -53,23 +58,27 @@ def test_constructor(self):
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
             project_id=PROJECT_ID,
+            description=DESCRIPTION,
         )
 
         self.assertEqual(operator.task_id, TASK_ID)
         self.assertEqual(operator.s3_bucket, S3_BUCKET)
         self.assertEqual(operator.gcs_bucket, GCS_BUCKET)
         self.assertEqual(operator.project_id, PROJECT_ID)
+        self.assertEqual(operator.description, DESCRIPTION)
 
     @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook')
     @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook')
     def test_execute(self, mock_s3_hook, mock_transfer_hook):
-        """Test the execute function when the run is successful."""
+        """Test the execute function with a custom schedule."""
 
         operator = S3ToGoogleCloudStorageTransferOperator(
             task_id=TASK_ID,
             s3_bucket=S3_BUCKET,
             gcs_bucket=GCS_BUCKET,
             project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            schedule=SCHEDULE,
         )
 
         mock_s3_hook.return_value.get_credentials.return_value = Credentials(
@@ -81,6 +90,8 @@ def test_execute(self, mock_s3_hook, mock_transfer_hook):
 
         mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
             project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            schedule=SCHEDULE,
             transfer_spec={
                 'awsS3DataSource': {
                     'bucketName': S3_BUCKET,
@@ -96,3 +107,50 @@ def test_execute(self, mock_s3_hook, mock_transfer_hook):
                 'transferOptions': {}
             }
         )
+
+        mock_transfer_hook.return_value.wait_for_transfer_job.assert_called_once_with(
+            mock_transfer_hook.return_value.create_transfer_job.return_value
+        )
+
+    @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook')
+    @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook')
+    def test_execute_skip_wait(self, mock_s3_hook, mock_transfer_hook):
+        """Test the execute function and wait until transfer is complete."""
+
+        operator = S3ToGoogleCloudStorageTransferOperator(
+            task_id=TASK_ID,
+            s3_bucket=S3_BUCKET,
+            gcs_bucket=GCS_BUCKET,
+            project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            wait=False,
+        )
+
+        mock_s3_hook.return_value.get_credentials.return_value = Credentials(
+            access_key=ACCESS_KEY,
+            secret_key=SECRET_KEY,
+        )
+
+        operator.execute(None)
+
+        mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
+            project_id=PROJECT_ID,
+            description=DESCRIPTION,
+            schedule=None,
+            transfer_spec={
+                'awsS3DataSource': {
+                    'bucketName': S3_BUCKET,
+                    'awsAccessKey': {
+                        'accessKeyId': ACCESS_KEY,
+                        'secretAccessKey': SECRET_KEY,
+                    }
+                },
+                'gcsDataSink': {
+                    'bucketName': GCS_BUCKET,
+                },
+                'objectConditions': {},
+                'transferOptions': {}
+            }
+        )
+
+        assert not mock_transfer_hook.return_value.wait_for_transfer_job.called


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services