You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/03 09:14:50 UTC

[GitHub] [airflow] Visya opened a new pull request #15850: Add AWS DMS replication task operators

Visya opened a new pull request #15850:
URL: https://github.com/apache/airflow/pull/15850


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   This PR adds ability to work with AWS DMS replication tasks in Airflow:
   * AWS DMS base hook.
   * AWS DMS replication task base and completion sensor.
   * Operators for creating, starting, stopping, deleting and describing AWS DMS replication tasks.
   * Example of a DAG for creating, waiting for completion and deleting a DMS replication task.
   
   Addresses a JIRA issue: [AIRFLOW-2334](https://issues.apache.org/jira/browse/AIRFLOW-2334)
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r639922333



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,

Review comment:
       Interesting, that example has both a `**kwargs` and a `create_job_kwargs`,  I suppose if the latter is distinctly different from the former it'd be required. But your method didn't have a `**kwargs` to begin with.
   
   Perhaps one of the other reviewers has more historical context on this matter?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #15850:
URL: https://github.com/apache/airflow/pull/15850


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya edited a comment on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya edited a comment on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-854616557


   Still timed out at pylint, I've updated my fork to see if this would pass.
   
   **Update:** Static linting failed on unrelated to the PR files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya edited a comment on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya edited a comment on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-854616557


   Still timed out at pylint, I've updated my fork to see if this would pass.
   
   **Update:** Static linting failed on unrelated to the PR files...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #15850:
URL: https://github.com/apache/airflow/pull/15850


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #15850:
URL: https://github.com/apache/airflow/pull/15850


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-854616557


   Still timed out at pylint, I've updated my fork to see if this would pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r640626547



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,

Review comment:
       So I would still have `create_job_kwargs` in `DmsCreateTaskOperator`, but have `DmsHook.describe_replication_tasks()` take a `**kwargs`, and do the `**` unpacking at where `DmsHook.describe_replication_tasks` is called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya edited a comment on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya edited a comment on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-854616557


   Still timed out at pylint, I've updated my fork to see if this would pass.
   
   Update: Static linting failed on unrelated to the PR files...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r636524649



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+
+    def stop_replication_task(self, replication_task_arn):
+        """
+        Stops replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.stop_replication_task(ReplicationTaskArn=replication_task_arn)
+

Review comment:
       Do you want a call to `wait_for_task_status(replication_task_arn, 'stopped')` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-890304214


   https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/index.html#id1 It's in the 2.0.0 providers according to the changelog. Don't you see it there?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tooptoop4 commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-890273515


   @potiuk  this doesn't seem to be in any release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r637377348



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,

Review comment:
       I relied on some of the examples in the aws provider code, for example:
   https://github.com/apache/airflow/blob/master/airflow/providers/amazon/aws/hooks/glue.py#L64
   
   Would you suggest using kwargs in both hook functions and the operator, something like this?
   ``` diff
   diff --git a/airflow/providers/amazon/aws/hooks/dms.py b/airflow/providers/amazon/aws/hooks/dms.py
   index 92b9f4924..1ead73c7e 100644
   --- a/airflow/providers/amazon/aws/hooks/dms.py
   +++ b/airflow/providers/amazon/aws/hooks/dms.py
   @@ -105,7 +105,7 @@ class DmsHook(AwsBaseHook):
            replication_instance_arn: str,
            migration_type: str,
            table_mappings: dict,
   -        create_task_kwargs: Optional[dict] = None,
   +        **kwargs,
        ) -> str:
            """
            Create DMS replication task
   @@ -122,11 +122,8 @@ class DmsHook(AwsBaseHook):
            :type table_mappings: dict
            :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
            :type migration_type: str
   -        :param create_task_kwargs: Extra arguments for DMS replication task creation.
   -        :type create_task_kwargs: Optional[dict]
            :return: Replication task ARN
            """
   -        additional_args = create_task_kwargs or {}
            dms_client = self.get_conn()
            create_task_response = dms_client.create_replication_task(
                ReplicationTaskIdentifier=replication_task_id,
   @@ -135,7 +132,7 @@ class DmsHook(AwsBaseHook):
                ReplicationInstanceArn=replication_instance_arn,
                MigrationType=migration_type,
                TableMappings=json.dumps(table_mappings),
   -            **additional_args,
   +            **kwargs,
            )
    
            replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
   diff --git a/airflow/providers/amazon/aws/operators/dms_create_task.py b/airflow/providers/amazon/aws/operators/dms_create_task.py
   index 22ddcd958..8cd2f327c 100644
   --- a/airflow/providers/amazon/aws/operators/dms_create_task.py
   +++ b/airflow/providers/amazon/aws/operators/dms_create_task.py
   @@ -43,8 +43,6 @@ class DmsCreateTaskOperator(BaseOperator):
        :type table_mappings: dict
        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
        :type migration_type: str
   -    :param create_task_kwargs: Extra arguments for DMS replication task creation.
   -    :type create_task_kwargs: Optional[dict]
        :param aws_conn_id: The Airflow connection used for AWS credentials.
            If this is None or empty then the default boto3 behaviour is used. If
            running Airflow in a distributed manner and aws_conn_id is None or
   @@ -60,13 +58,9 @@ class DmsCreateTaskOperator(BaseOperator):
            'replication_instance_arn',
            'table_mappings',
            'migration_type',
   -        'create_task_kwargs',
        )
        template_ext = ()
   -    template_fields_renderers = {
   -        "table_mappings": "json",
   -        "create_task_kwargs": "json",
   -    }
   +    template_fields_renderers = {"table_mappings": "json"}
    
        @apply_defaults
        def __init__(
   @@ -78,7 +72,6 @@ class DmsCreateTaskOperator(BaseOperator):
            replication_instance_arn: str,
            table_mappings: dict,
            migration_type: Optional[str] = 'full-load',
   -        create_task_kwargs: Optional[dict] = None,
            aws_conn_id: str = 'aws_default',
            **kwargs,
        ):
   @@ -89,7 +82,7 @@ class DmsCreateTaskOperator(BaseOperator):
            self.replication_instance_arn = replication_instance_arn
            self.migration_type = migration_type
            self.table_mappings = table_mappings
   -        self.create_task_kwargs = create_task_kwargs
   +        self.create_task_kwargs = kwargs
            self.aws_conn_id = aws_conn_id
    
        def execute(self, context):
   @@ -107,7 +100,7 @@ class DmsCreateTaskOperator(BaseOperator):
                replication_instance_arn=self.replication_instance_arn,
                migration_type=self.migration_type,
                table_mappings=self.table_mappings,
   -            create_task_kwargs=self.create_task_kwargs,
   +            **self.create_task_kwargs,
            )
            self.log.info("DMS replication task(%s) is ready.", self.replication_task_id)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r636523464



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+
+    def stop_replication_task(self, replication_task_arn):
+        """
+        Stops replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.stop_replication_task(ReplicationTaskArn=replication_task_arn)
+
+    def delete_replication_task(self, replication_task_arn):
+        """
+        Starts replication task deletion and waits for it to be deleted
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.delete_replication_task(ReplicationTaskArn=replication_task_arn)
+
+        self.wait_for_task_status(replication_task_arn, 'deleted')
+
+    def wait_for_task_status(self, replication_task_arn: str, status: str):
+        """
+        Waits for replication task to reach status.
+        Supported statuses: deleted, ready, running, stopped.
+
+        :param status: Status to wait for
+        :type status: str
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        available_statuses = ['deleted', 'ready', 'running', 'stopped']

Review comment:
       Small nit: you could move this to an Enum instead of a local variable. Also that way the method stub would be more strongly typed as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r637375959



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+
+    def stop_replication_task(self, replication_task_arn):
+        """
+        Stops replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.stop_replication_task(ReplicationTaskArn=replication_task_arn)
+

Review comment:
       Similar to not using `running` waiter in start task, I have opted out to not use stopped waiter here, since it seems to not work in a straightforward way.
   
   Running and stopped waiters in DMS have an unusual behaviour that is unintuitive for users:
   https://github.com/boto/botocore/blob/develop/botocore/data/dms/2016-01-01/waiters-2.json#L168
   https://github.com/boto/boto3/issues/1926




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-853666644


   The pylint CI times out for some reason, but it works when I run it locally, and all the tests pass. From what I see, most of the CI builds have failed in the past couple of days, so I don't think it's specific to my MR.
   
   This is ready to merge unless you have any more comments. πŸ™‚ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r636523602



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+
+    def stop_replication_task(self, replication_task_arn):
+        """
+        Stops replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.stop_replication_task(ReplicationTaskArn=replication_task_arn)
+
+    def delete_replication_task(self, replication_task_arn):
+        """
+        Starts replication task deletion and waits for it to be deleted
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.delete_replication_task(ReplicationTaskArn=replication_task_arn)
+
+        self.wait_for_task_status(replication_task_arn, 'deleted')
+
+    def wait_for_task_status(self, replication_task_arn: str, status: str):
+        """
+        Waits for replication task to reach status.
+        Supported statuses: deleted, ready, running, stopped.
+
+        :param status: Status to wait for
+        :type status: str
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        available_statuses = ['deleted', 'ready', 'running', 'stopped']

Review comment:
       Nit: you could move this to an Enum instead of a list of magic strings. Also that way the method stub would be more strongly typed as well taking the enum as input instead of a string.

##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+

Review comment:
       Do you want a call to `wait_for_task_status(replication_task_arn, 'running')` here?

##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+
+    def stop_replication_task(self, replication_task_arn):
+        """
+        Stops replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.stop_replication_task(ReplicationTaskArn=replication_task_arn)
+

Review comment:
       DO you want a call to `wait_for_task_status(replication_task_arn, 'stopped')` here?

##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,

Review comment:
       Small nit: You could gather up individual arguments passed in by using the keyword arguments python operator  `**` often seen as `**kwargs`, instead of requesting a dict explicitly. This might be a bit more Pythonic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r637379993



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+
+    def stop_replication_task(self, replication_task_arn):
+        """
+        Stops replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.stop_replication_task(ReplicationTaskArn=replication_task_arn)
+
+    def delete_replication_task(self, replication_task_arn):
+        """
+        Starts replication task deletion and waits for it to be deleted
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        dms_client = self.get_conn()
+        dms_client.delete_replication_task(ReplicationTaskArn=replication_task_arn)
+
+        self.wait_for_task_status(replication_task_arn, 'deleted')
+
+    def wait_for_task_status(self, replication_task_arn: str, status: str):
+        """
+        Waits for replication task to reach status.
+        Supported statuses: deleted, ready, running, stopped.
+
+        :param status: Status to wait for
+        :type status: str
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        """
+        available_statuses = ['deleted', 'ready', 'running', 'stopped']

Review comment:
       Good point, thanks! πŸ‘ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya edited a comment on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya edited a comment on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-853666644


   The pylint CI times out for some reason, but it works when I run it locally, and all the tests pass. From what I see, most of the CI builds have failed in the past couple of days, so I don't think it's specific to my PR.
   
   This is ready to merge unless you have any more comments. πŸ™‚ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-853666644


   The pylint CI times out for some reason, but it works when I run it locally, and all the tests pass. From what I see, most of the CI builds have failed in the past couple of days, so I don't think it's specific to my MR.
   
   This is ready to merge unless you have any more comments. πŸ™‚ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-855824130


   I'm still unable to make this pass, it times out at pylint.
   I have run pre-commit locally, and it succeeds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-853720254


   There was a GitHub failure. Closing/reopening to rebuild 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-841516482


   [The Workflow run](https://github.com/apache/airflow/actions/runs/843378547) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-859383978


   The pipeline finally succeeded πŸ™‚ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r640624660



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,

Review comment:
       You can’t just use `**kwargs` instead of `create_job_kwargs` because `AwsGlueJobOperator` accepts many arguments, and it would be painful for readability if some of them are passed into boto3 and some are not. So having a separate `create_job_kwargs` is better in this instance.
   
   In this case though, all of `describe_replication_tasks`’s arguments are simply forwarded to the boto3 client, so the additional dict feels unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-853720254


   There was a GitHub failure. Closing/reopening to rebuild 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r637375774



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,
+    ):
+        """
+        Describe replication tasks
+        :param describe_tasks_kwargs: Describe tasks command arguments
+        :type describe_tasks_kwargs: Optional[dict]
+
+        :return: Marker and list of replication tasks
+        :rtype: (str, list)
+        """
+        additional_args = describe_tasks_kwargs or {}
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(**additional_args)
+
+        return response['Marker'], response['ReplicationTasks']
+
+    def find_replication_tasks_by_arn(
+        self, replication_task_arn: str, without_settings: Optional[bool] = False
+    ):
+        """
+        Find and describe replication tasks by task ARN
+        :param replication_task_arn: Replication task arn
+        :type replication_task_arn: str
+        :param without_settings: Indicates whether to return task information with settings.
+        :type without_settings: Optional[bool]
+
+        :return: list of replication tasks that match the ARN
+        """
+        dms_client = self.get_conn()
+        response = dms_client.describe_replication_tasks(
+            Filters=[
+                {
+                    'Name': 'replication-task-arn',
+                    'Values': [replication_task_arn],
+                }
+            ],
+            WithoutSettings=without_settings,
+        )
+
+        return response['ReplicationTasks']
+
+    def get_task_status(self, replication_task_arn: str) -> Optional[str]:
+        """
+        Retrieve task status.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :return: Current task status
+        """
+        replication_tasks = self.find_replication_tasks_by_arn(
+            replication_task_arn=replication_task_arn,
+            without_settings=True,
+        )
+
+        if len(replication_tasks) == 1:
+            status = replication_tasks[0]['Status']
+            self.log.info('Replication task with ARN(%s) has status "%s".', replication_task_arn, status)
+            return status
+        else:
+            self.log.info('Replication task with ARN(%s) is not found.', replication_task_arn)
+            return None
+
+    def create_replication_task(
+        self,
+        replication_task_id: str,
+        source_endpoint_arn: str,
+        target_endpoint_arn: str,
+        replication_instance_arn: str,
+        migration_type: str,
+        table_mappings: dict,
+        create_task_kwargs: Optional[dict] = None,
+    ) -> str:
+        """
+        Create DMS replication task
+
+        :param replication_task_id: Replication task id
+        :type replication_task_id: str
+        :param source_endpoint_arn: Source endpoint ARN
+        :type source_endpoint_arn: str
+        :param target_endpoint_arn: Target endpoint ARN
+        :type target_endpoint_arn: str
+        :param replication_instance_arn: Replication instance ARN
+        :type replication_instance_arn: str
+        :param table_mappings: Table mappings
+        :type table_mappings: dict
+        :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default.
+        :type migration_type: str
+        :param create_task_kwargs: Extra arguments for DMS replication task creation.
+        :type create_task_kwargs: Optional[dict]
+        :return: Replication task ARN
+        """
+        additional_args = create_task_kwargs or {}
+        dms_client = self.get_conn()
+        create_task_response = dms_client.create_replication_task(
+            ReplicationTaskIdentifier=replication_task_id,
+            SourceEndpointArn=source_endpoint_arn,
+            TargetEndpointArn=target_endpoint_arn,
+            ReplicationInstanceArn=replication_instance_arn,
+            MigrationType=migration_type,
+            TableMappings=json.dumps(table_mappings),
+            **additional_args,
+        )
+
+        replication_task_arn = create_task_response['ReplicationTask']['ReplicationTaskArn']
+        self.wait_for_task_status(replication_task_arn, 'ready')
+
+        return replication_task_arn
+
+    def start_replication_task(
+        self,
+        replication_task_arn: str,
+        start_replication_task_type: str,
+        start_task_kwargs: Optional[dict] = None,
+    ):
+        """
+        Starts replication task.
+
+        :param replication_task_arn: Replication task ARN
+        :type replication_task_arn: str
+        :param start_replication_task_type: Replication task start type
+            ('start-replication'|'resume-processing'|'reload-target')
+        :type start_replication_task_type: str
+        :param start_task_kwargs: Extra start replication task arguments
+        :type start_task_kwargs: Optional[dict]
+        """
+        additional_args = start_task_kwargs or {}
+        dms_client = self.get_conn()
+        dms_client.start_replication_task(
+            ReplicationTaskArn=replication_task_arn,
+            StartReplicationTaskType=start_replication_task_type,
+            **additional_args,
+        )
+

Review comment:
       Running and stopped waiters in DMS have an unusual behaviour that is unintuitive for users:
   https://github.com/boto/botocore/blob/develop/botocore/data/dms/2016-01-01/waiters-2.json#L168
   https://github.com/boto/boto3/issues/1926
   
   Where `ready` status is considered a failure state for `running` waiter.
   
   In case of DMS, the most natural way of using the start task operator would be:
   "Create task" >> "Start task"
   
   If we add a running waiter to start task function, it fails because of the `ready` state which the task transitions into. I have tested it, and this was the result. So I opted out for not using the waiter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-890304214


   https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/index.html#id1 It' sin the 2.0.0 providers according to the changelog. Don't you see it there?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya commented on a change in pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya commented on a change in pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#discussion_r642372966



##########
File path: airflow/providers/amazon/aws/hooks/dms.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class DmsHook(AwsBaseHook):
+    """Interact with AWS Database Migration Service."""
+
+    def __init__(
+        self,
+        *args,
+        **kwargs,
+    ):
+        kwargs['client_type'] = 'dms'
+        super().__init__(*args, **kwargs)
+
+    def describe_replication_tasks(
+        self,
+        describe_tasks_kwargs: Optional[dict] = None,

Review comment:
       Thanks, will do!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Visya edited a comment on pull request #15850: Add AWS DMS replication task operators

Posted by GitBox <gi...@apache.org>.
Visya edited a comment on pull request #15850:
URL: https://github.com/apache/airflow/pull/15850#issuecomment-853666644


   The pylint CI times out for some reason, but it works when I run it locally, and all the tests pass. From what I see, most of the CI builds have failed in the past couple of days, so I don't think it's specific to my PR.
   
   This is ready to merge unless you have any more comments. πŸ™‚ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org