You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/22 08:45:04 UTC

[GitHub] [airflow] Visya commented on a change in pull request #15850: Add AWS DMS replication task operators

Visya commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r637375774



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+

Review comment:
       Running and stopped waiters in DMS have an unusual behaviour that is unintuitive for users:
   https://github.com/boto/botocore/blob/develop/botocore/data/dms/2016-01-01/waiters-2.json#L168
   https://github.com/boto/boto3/issues/1926
   
   Where `ready` status is considered a failure state for `running` waiter.
   
   In case of DMS, the most natural way of using the start task operator would be:
   "Create task" >> "Start task"
   
   If we add a running waiter to start task function, it fails because of the `ready` state which the task transitions into. I have tested it, and this was the result. So I opted out for not using the waiter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org