You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/09/23 14:15:25 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

caishunfeng commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r978709244


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DeleteReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.Filter;
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.ResourceNotFoundException;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.StopReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+import com.amazonaws.services.databasemigrationservice.model.TestConnectionRequest;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+public class DmsHook {
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+    private AWSDatabaseMigrationService client;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    public DmsHook() {
+        this.client = createClient();
+    }
+
+    public static AWSDatabaseMigrationService createClient() {
+        final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
+
+        // create a DMS client
+        return AWSDatabaseMigrationServiceClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+
+    public Boolean createReplicationTask() throws Exception {
+        logger.info("createReplicationTask ......");
+        CreateReplicationTaskRequest request = new CreateReplicationTaskRequest()
+            .withReplicationTaskIdentifier(replicationTaskIdentifier)
+            .withSourceEndpointArn(sourceEndpointArn)
+            .withTargetEndpointArn(targetEndpointArn)
+            .withReplicationInstanceArn(replicationInstanceArn)
+            .withMigrationType(migrationType)
+            .withTableMappings(tableMappings)
+            .withReplicationTaskSettings(replicationTaskSettings)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition)
+            .withTags(tags)
+            .withTaskData(taskData)
+            .withResourceIdentifier(resourceIdentifier);
+
+        request.setTableMappings(replaceFileParameters(request.getTableMappings()));
+        request.setReplicationTaskSettings(replaceFileParameters(request.getReplicationTaskSettings()));
+
+        CreateReplicationTaskResult result = client.createReplicationTask(request);
+        replicationTaskIdentifier = result.getReplicationTask().getReplicationTaskIdentifier();
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        logger.info("replicationTaskIdentifier: {}, replicationTaskArn: {}", replicationTaskIdentifier, replicationTaskArn);
+        return awaitReplicationTaskStatus(STATUS.READY);
+    }
+
+
+    public Boolean startReplicationTask() {
+        logger.info("startReplicationTask ......");
+        StartReplicationTaskRequest request = new StartReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn)
+            .withStartReplicationTaskType(startReplicationTaskType)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition);
+        StartReplicationTaskResult result = client.startReplicationTask(request);
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        return awaitReplicationTaskStatus(STATUS.RUNNING);
+    }
+
+    public Boolean checkFinishedReplicationTask() {
+        logger.info("checkFinishedReplicationTask ......");
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+        String stopReason = describeReplicationTasks().getStopReason();
+        return stopReason.endsWith(STATUS.FINISH_END_TOKEN);
+    }
+
+    public void stopReplicationTask() {
+        logger.info("stopReplicationTask ......");
+        if (replicationTaskArn == null) {
+            return;
+        }
+        StopReplicationTaskRequest request = new StopReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.stopReplicationTask(request);
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+    }
+
+    public Boolean deleteReplicationTask() {
+        logger.info("deleteReplicationTask ......");
+        DeleteReplicationTaskRequest request = new DeleteReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.deleteReplicationTask(request);
+        Boolean isDeleteSuccessfully;
+        try {
+            isDeleteSuccessfully = awaitReplicationTaskStatus(STATUS.DELETE);
+        } catch (ResourceNotFoundException e) {
+            isDeleteSuccessfully = true;
+        }
+        return isDeleteSuccessfully;
+    }
+
+    public Boolean testConnectionEndpoint() {
+        return (testConnection(replicationInstanceArn, sourceEndpointArn) && testConnection(replicationInstanceArn, targetEndpointArn));
+    }
+
+    public Boolean testConnection(String replicationInstanceArn, String endpointArn) {
+        logger.info("Test connect replication instance: {} and endpoint: {}", replicationInstanceArn, endpointArn);
+        TestConnectionRequest request = new TestConnectionRequest().
+            withReplicationInstanceArn(replicationInstanceArn)
+            .withEndpointArn(endpointArn);
+        try {
+            client.testConnection(request);
+        } catch (InvalidResourceStateException e) {
+            logger.info(e.getErrorMessage());
+        }
+
+        return awaitConnectSuccess(replicationInstanceArn, endpointArn);
+    }
+
+    public Boolean awaitConnectSuccess(String replicationInstanceArn, String endpointArn) {
+        Filter instanceFilters = new Filter().withName(AWS_KEY.REPLICATION_INSTANCE_ARN).withValues(replicationInstanceArn);
+        Filter endpointFilters = new Filter().withName(AWS_KEY.ENDPOINT_ARN).withValues(endpointArn);
+        DescribeConnectionsRequest request = new DescribeConnectionsRequest().withFilters(endpointFilters, instanceFilters)
+            .withMarker("");
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            DescribeConnectionsResult response = client.describeConnections(request);
+            String status = response.getConnections().get(0).getStatus();
+            if (status.equals(STATUS.SUCCESSFUL)) {
+                logger.info("Connect successful");
+                return true;
+            } else if (!status.equals(STATUS.TESTING)) {
+                break;
+            }
+        }
+        logger.info("Connect error");
+        return false;
+    }
+
+    public ReplicationTask describeReplicationTasks() {
+        Filter replicationTaskFilter = new Filter().withName(AWS_KEY.REPLICATION_TASK_ARN).withValues(replicationTaskArn);
+        DescribeReplicationTasksRequest request = new DescribeReplicationTasksRequest().withFilters(replicationTaskFilter).withMaxRecords(20).withMarker("");
+        DescribeReplicationTasksResult result = client.describeReplicationTasks(request);
+        ReplicationTask replicationTask = result.getReplicationTasks().get(0);
+
+        if (sourceEndpointArn == null) {
+            sourceEndpointArn = replicationTask.getSourceEndpointArn();
+        }
+
+        if (targetEndpointArn == null) {
+            targetEndpointArn = replicationTask.getTargetEndpointArn();
+        }
+
+        if (replicationInstanceArn == null) {
+            replicationInstanceArn = replicationTask.getReplicationInstanceArn();
+        }
+
+        if (replicationTaskArn == null) {
+            replicationTaskArn = replicationTask.getReplicationTaskArn();
+        }
+
+        return replicationTask;
+    }
+
+    public Boolean awaitReplicationTaskStatus(String exceptStatus, String... stopStatus) {
+        List<String> stopStatusSet = Arrays.asList(stopStatus);
+        Integer lastPercent = 0;
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            ReplicationTask replicationTask = describeReplicationTasks();
+            String status = replicationTask.getStatus();
+
+            if (status.equals(STATUS.RUNNING) || status.equals(STATUS.STOPPED)) {
+                ReplicationTaskStats taskStats = replicationTask.getReplicationTaskStats();
+                Integer percent;
+                if (taskStats != null) {
+                    percent = taskStats.getFullLoadProgressPercent();
+                } else {
+                    percent = 0;
+                }
+                if (!lastPercent.equals(percent)) {
+                    String runningMessage = String.format("fullLoadProgressPercent: %s ", percent);
+                    logger.info(runningMessage);
+                }
+                lastPercent = percent;
+            }
+
+            if (exceptStatus.equals(status)) {
+                logger.info("success");

Review Comment:
   remove it if useless.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/pom.xml:
##########
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-task-dms</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-dms</artifactId>
+            <version>1.12.297</version>
+        </dependency>

Review Comment:
   Move the version to bom.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenAnswer(invocation -> client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");

Review Comment:
   use constant string `arn:aws:dms:ap-southeast-1:123456789012:task:task`



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");
+        }
+    }
+
+    public int checkCreateReplicationTask() throws TaskException {
+
+        // if IsRestartTask, return success, do not create replication task
+        if (parameters.getIsRestartTask()) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        }
+
+        // if not IsRestartTask, create replication task
+        Boolean isCreateSuccessfully;
+        try {
+            isCreateSuccessfully = dmsHook.createReplicationTask();
+        } catch (Exception e) {
+            throw new TaskException("DMS task create replication task error", e);
+        }
+
+        // if create replication task successfully, return EXIT_CODE_SUCCESS, else return EXIT_CODE_FAILURE
+        if (isCreateSuccessfully) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+    }
+
+    /**
+     * start replication task
+     *
+     * @return
+     * @throws TaskException
+     */
+    public int startReplicationTask() {
+
+        Boolean isStartSuccessfully = false;
+        try {
+            isStartSuccessfully = dmsHook.startReplicationTask();
+        } catch (InvalidResourceStateException e) {
+            logger.error("Failed to start a task, error message: {}", e.getErrorMessage());
+
+            // Only restart task when the error contains "Test connection", means instance can not connect to source or target
+            if (!e.getErrorMessage().contains("Test connection")) {
+                return TaskConstants.EXIT_CODE_FAILURE;
+            }
+
+            logger.info("restart replication task");
+            // if only restart task, run dmsHook.describeReplicationTasks to get replication task arn
+            if (parameters.getIsRestartTask()) {
+                dmsHook.describeReplicationTasks();
+            }
+
+            // test connection endpoint again and restart task if connection is ok
+            if (dmsHook.testConnectionEndpoint()) {
+                isStartSuccessfully = dmsHook.startReplicationTask();
+            }
+        }
+
+        // if start replication task failed, return EXIT_CODE_FAILURE
+        if (!isStartSuccessfully) {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+
+        return TaskConstants.EXIT_CODE_SUCCESS;
+    }
+
+    /**
+     * check if stop task when cdc
+     *
+     * @return true if stop task when cdc type and cdcStopPosition is not set, else return false
+     */
+    public Boolean isStopTaskWhenCdc() {
+        ReplicationTask replicationTask = dmsHook.describeReplicationTasks();
+        String migrationType = replicationTask.getMigrationType();
+        return migrationType.contains("cdc") && parameters.getCdcStopPosition() == null;
+    }
+
+    /**
+     * init dms hook
+     */
+    public void initDmsHook() throws TaskException {
+        convertJsonParameters();
+
+        dmsHook = new DmsHook();
+        try {
+            BeanUtils.copyProperties(dmsHook, parameters);
+        } catch (Exception e) {
+            throw new TaskException("DMS task init error", e);
+        }
+
+
+        if (!StringUtils.isNotEmpty(parameters.getStartReplicationTaskType())) {
+            if (parameters.getIsRestartTask()) {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.RELOAD_TARGET);
+            } else {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.START_REPLICATION);
+            }
+        }
+    }
+
+    /**
+     * convert json parameters to dms parameters
+     */
+    public void convertJsonParameters() throws TaskException {
+        // create a new parameter object using the json data if the json data is not empty
+        if (parameters.getIsJsonFormat() && parameters.getJsonData() != null) {
+            // combining local and global parameters
+            String jsonData = ParameterUtils.convertParameterPlaceholders(parameters.getJsonData(), ParamUtils.convert(taskExecutionContext.getPrepareParamsMap()));
+
+            boolean isRestartTask = parameters.getIsRestartTask();
+            try {
+                parameters = objectMapper.readValue(jsonData, DmsParameters.class);
+                parameters.setIsRestartTask(isRestartTask);
+            } catch (Exception e) {
+                logger.error("Failed to convert json data to DmsParameters object, error message: {}", e.getMessage());

Review Comment:
   ```suggestion
                   logger.error("Failed to convert json data to DmsParameters object", e);
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsParameters.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import java.util.Date;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+
+import lombok.Data;
+
+@Data
+public class DmsParameters extends AbstractParameters {
+
+    private Boolean isRestartTask = false;
+    private Boolean isJsonFormat = false;
+    private String jsonData;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    @Override
+    public boolean checkParameters() {
+        boolean flag;
+        if (isJsonFormat) {
+            flag = jsonData != null;
+        } else if (isRestartTask) {
+            flag = (replicationTaskArn != null);

Review Comment:
   ```suggestion
               flag = replicationTaskArn != null;
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTaskTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.support.membermodification.MemberModifier;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    PropertyUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsTaskTest {
+
+    @Mock
+    DmsHook dmsHook;
+
+    DmsTask dmsTask;
+
+    @Before
+    public void before() throws Exception {
+        whenNew(DmsHook.class).withAnyArguments().thenReturn(dmsHook);
+        DmsParameters dmsParameters = new DmsParameters();
+        dmsTask = initTask(dmsParameters);
+        dmsTask.initDmsHook();
+        MemberModifier.field(DmsTask.class, "dmsHook").set(dmsTask, dmsHook);
+    }
+
+    @Test
+    public void testCreateTaskJson() {

Review Comment:
   It seems a meanless test case.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");
+        }
+    }
+
+    public int checkCreateReplicationTask() throws TaskException {
+
+        // if IsRestartTask, return success, do not create replication task
+        if (parameters.getIsRestartTask()) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        }
+
+        // if not IsRestartTask, create replication task
+        Boolean isCreateSuccessfully;
+        try {
+            isCreateSuccessfully = dmsHook.createReplicationTask();
+        } catch (Exception e) {
+            throw new TaskException("DMS task create replication task error", e);
+        }
+
+        // if create replication task successfully, return EXIT_CODE_SUCCESS, else return EXIT_CODE_FAILURE
+        if (isCreateSuccessfully) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+    }
+
+    /**
+     * start replication task
+     *
+     * @return
+     * @throws TaskException
+     */
+    public int startReplicationTask() {
+
+        Boolean isStartSuccessfully = false;
+        try {
+            isStartSuccessfully = dmsHook.startReplicationTask();
+        } catch (InvalidResourceStateException e) {
+            logger.error("Failed to start a task, error message: {}", e.getErrorMessage());
+
+            // Only restart task when the error contains "Test connection", means instance can not connect to source or target
+            if (!e.getErrorMessage().contains("Test connection")) {
+                return TaskConstants.EXIT_CODE_FAILURE;
+            }
+
+            logger.info("restart replication task");
+            // if only restart task, run dmsHook.describeReplicationTasks to get replication task arn
+            if (parameters.getIsRestartTask()) {
+                dmsHook.describeReplicationTasks();
+            }
+
+            // test connection endpoint again and restart task if connection is ok
+            if (dmsHook.testConnectionEndpoint()) {
+                isStartSuccessfully = dmsHook.startReplicationTask();
+            }
+        }
+
+        // if start replication task failed, return EXIT_CODE_FAILURE
+        if (!isStartSuccessfully) {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+
+        return TaskConstants.EXIT_CODE_SUCCESS;
+    }
+
+    /**
+     * check if stop task when cdc
+     *
+     * @return true if stop task when cdc type and cdcStopPosition is not set, else return false
+     */
+    public Boolean isStopTaskWhenCdc() {
+        ReplicationTask replicationTask = dmsHook.describeReplicationTasks();
+        String migrationType = replicationTask.getMigrationType();
+        return migrationType.contains("cdc") && parameters.getCdcStopPosition() == null;
+    }
+
+    /**
+     * init dms hook
+     */
+    public void initDmsHook() throws TaskException {
+        convertJsonParameters();
+
+        dmsHook = new DmsHook();
+        try {
+            BeanUtils.copyProperties(dmsHook, parameters);
+        } catch (Exception e) {
+            throw new TaskException("DMS task init error", e);
+        }
+
+
+        if (!StringUtils.isNotEmpty(parameters.getStartReplicationTaskType())) {
+            if (parameters.getIsRestartTask()) {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.RELOAD_TARGET);
+            } else {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.START_REPLICATION);
+            }
+        }
+    }
+
+    /**
+     * convert json parameters to dms parameters
+     */
+    public void convertJsonParameters() throws TaskException {
+        // create a new parameter object using the json data if the json data is not empty
+        if (parameters.getIsJsonFormat() && parameters.getJsonData() != null) {
+            // combining local and global parameters
+            String jsonData = ParameterUtils.convertParameterPlaceholders(parameters.getJsonData(), ParamUtils.convert(taskExecutionContext.getPrepareParamsMap()));
+
+            boolean isRestartTask = parameters.getIsRestartTask();
+            try {
+                parameters = objectMapper.readValue(jsonData, DmsParameters.class);
+                parameters.setIsRestartTask(isRestartTask);
+            } catch (Exception e) {
+                logger.error("Failed to convert json data to DmsParameters object, error message: {}", e.getMessage());
+                throw new TaskException(e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public DmsParameters getParameters() {
+        return parameters;
+    }
+
+    @Override
+    public void cancelApplication() {
+        dmsHook.stopReplicationTask();
+//        dmsHook.deleteReplicationTask();

Review Comment:
   ```suggestion
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DeleteReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.Filter;
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.ResourceNotFoundException;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.StopReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+import com.amazonaws.services.databasemigrationservice.model.TestConnectionRequest;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+public class DmsHook {
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+    private AWSDatabaseMigrationService client;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    public DmsHook() {
+        this.client = createClient();
+    }
+
+    public static AWSDatabaseMigrationService createClient() {
+        final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
+
+        // create a DMS client
+        return AWSDatabaseMigrationServiceClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+
+    public Boolean createReplicationTask() throws Exception {
+        logger.info("createReplicationTask ......");
+        CreateReplicationTaskRequest request = new CreateReplicationTaskRequest()
+            .withReplicationTaskIdentifier(replicationTaskIdentifier)
+            .withSourceEndpointArn(sourceEndpointArn)
+            .withTargetEndpointArn(targetEndpointArn)
+            .withReplicationInstanceArn(replicationInstanceArn)
+            .withMigrationType(migrationType)
+            .withTableMappings(tableMappings)
+            .withReplicationTaskSettings(replicationTaskSettings)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition)
+            .withTags(tags)
+            .withTaskData(taskData)
+            .withResourceIdentifier(resourceIdentifier);
+
+        request.setTableMappings(replaceFileParameters(request.getTableMappings()));
+        request.setReplicationTaskSettings(replaceFileParameters(request.getReplicationTaskSettings()));
+
+        CreateReplicationTaskResult result = client.createReplicationTask(request);
+        replicationTaskIdentifier = result.getReplicationTask().getReplicationTaskIdentifier();
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        logger.info("replicationTaskIdentifier: {}, replicationTaskArn: {}", replicationTaskIdentifier, replicationTaskArn);
+        return awaitReplicationTaskStatus(STATUS.READY);
+    }
+
+
+    public Boolean startReplicationTask() {
+        logger.info("startReplicationTask ......");
+        StartReplicationTaskRequest request = new StartReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn)
+            .withStartReplicationTaskType(startReplicationTaskType)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition);
+        StartReplicationTaskResult result = client.startReplicationTask(request);
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        return awaitReplicationTaskStatus(STATUS.RUNNING);
+    }
+
+    public Boolean checkFinishedReplicationTask() {
+        logger.info("checkFinishedReplicationTask ......");
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+        String stopReason = describeReplicationTasks().getStopReason();
+        return stopReason.endsWith(STATUS.FINISH_END_TOKEN);
+    }
+
+    public void stopReplicationTask() {
+        logger.info("stopReplicationTask ......");
+        if (replicationTaskArn == null) {
+            return;
+        }
+        StopReplicationTaskRequest request = new StopReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.stopReplicationTask(request);
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+    }
+
+    public Boolean deleteReplicationTask() {
+        logger.info("deleteReplicationTask ......");
+        DeleteReplicationTaskRequest request = new DeleteReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.deleteReplicationTask(request);
+        Boolean isDeleteSuccessfully;
+        try {
+            isDeleteSuccessfully = awaitReplicationTaskStatus(STATUS.DELETE);
+        } catch (ResourceNotFoundException e) {
+            isDeleteSuccessfully = true;
+        }
+        return isDeleteSuccessfully;
+    }
+
+    public Boolean testConnectionEndpoint() {
+        return (testConnection(replicationInstanceArn, sourceEndpointArn) && testConnection(replicationInstanceArn, targetEndpointArn));
+    }
+
+    public Boolean testConnection(String replicationInstanceArn, String endpointArn) {
+        logger.info("Test connect replication instance: {} and endpoint: {}", replicationInstanceArn, endpointArn);
+        TestConnectionRequest request = new TestConnectionRequest().
+            withReplicationInstanceArn(replicationInstanceArn)
+            .withEndpointArn(endpointArn);
+        try {
+            client.testConnection(request);
+        } catch (InvalidResourceStateException e) {
+            logger.info(e.getErrorMessage());
+        }
+
+        return awaitConnectSuccess(replicationInstanceArn, endpointArn);
+    }
+
+    public Boolean awaitConnectSuccess(String replicationInstanceArn, String endpointArn) {
+        Filter instanceFilters = new Filter().withName(AWS_KEY.REPLICATION_INSTANCE_ARN).withValues(replicationInstanceArn);
+        Filter endpointFilters = new Filter().withName(AWS_KEY.ENDPOINT_ARN).withValues(endpointArn);
+        DescribeConnectionsRequest request = new DescribeConnectionsRequest().withFilters(endpointFilters, instanceFilters)
+            .withMarker("");
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            DescribeConnectionsResult response = client.describeConnections(request);
+            String status = response.getConnections().get(0).getStatus();
+            if (status.equals(STATUS.SUCCESSFUL)) {
+                logger.info("Connect successful");
+                return true;
+            } else if (!status.equals(STATUS.TESTING)) {
+                break;
+            }
+        }
+        logger.info("Connect error");
+        return false;
+    }
+
+    public ReplicationTask describeReplicationTasks() {
+        Filter replicationTaskFilter = new Filter().withName(AWS_KEY.REPLICATION_TASK_ARN).withValues(replicationTaskArn);
+        DescribeReplicationTasksRequest request = new DescribeReplicationTasksRequest().withFilters(replicationTaskFilter).withMaxRecords(20).withMarker("");
+        DescribeReplicationTasksResult result = client.describeReplicationTasks(request);
+        ReplicationTask replicationTask = result.getReplicationTasks().get(0);
+
+        if (sourceEndpointArn == null) {
+            sourceEndpointArn = replicationTask.getSourceEndpointArn();
+        }
+
+        if (targetEndpointArn == null) {
+            targetEndpointArn = replicationTask.getTargetEndpointArn();
+        }
+
+        if (replicationInstanceArn == null) {
+            replicationInstanceArn = replicationTask.getReplicationInstanceArn();
+        }
+
+        if (replicationTaskArn == null) {
+            replicationTaskArn = replicationTask.getReplicationTaskArn();
+        }
+
+        return replicationTask;
+    }
+
+    public Boolean awaitReplicationTaskStatus(String exceptStatus, String... stopStatus) {
+        List<String> stopStatusSet = Arrays.asList(stopStatus);
+        Integer lastPercent = 0;
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            ReplicationTask replicationTask = describeReplicationTasks();
+            String status = replicationTask.getStatus();
+
+            if (status.equals(STATUS.RUNNING) || status.equals(STATUS.STOPPED)) {
+                ReplicationTaskStats taskStats = replicationTask.getReplicationTaskStats();
+                Integer percent;
+                if (taskStats != null) {
+                    percent = taskStats.getFullLoadProgressPercent();
+                } else {
+                    percent = 0;
+                }
+                if (!lastPercent.equals(percent)) {
+                    String runningMessage = String.format("fullLoadProgressPercent: %s ", percent);
+                    logger.info(runningMessage);
+                }
+                lastPercent = percent;
+            }
+
+            if (exceptStatus.equals(status)) {
+                logger.info("success");
+                return true;
+            } else if (stopStatusSet.contains(status)) {
+                break;
+            }
+        }
+        logger.info("error");

Review Comment:
   remove it if useless.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenAnswer(invocation -> client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.READY);
+        when(createReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.createReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+        Assert.assertEquals("task", dmsHook.getReplicationTaskIdentifier());
+    }
+
+    @Test(timeout = 60000)
+    public void testStartReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+        StartReplicationTaskResult startReplicationTaskResult = mock(StartReplicationTaskResult.class);
+        when(client.startReplicationTask(any())).thenReturn(startReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.RUNNING);
+        when(startReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.startReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckFinishedReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.STOPPED);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        when(replicationTask.getStopReason()).thenReturn("*_FINISHED");
+        Assert.assertTrue(dmsHook.checkFinishedReplicationTask());
+
+        when(replicationTask.getStopReason()).thenReturn("*_ERROR");
+        Assert.assertFalse(dmsHook.checkFinishedReplicationTask());
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.DELETE);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.deleteReplicationTask());
+
+    }
+
+    @Test
+    public void testTestConnectionEndpoint() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        String replicationInstanceArn = "replicationInstanceArn";
+        String trueSourceEndpointArn = "trueSourceEndpointArn";
+        String trueTargetEndpointArn = "trueTargetEndpointArn";
+        String falseSourceEndpointArn = "falseSourceEndpointArn";
+        String falseTargetEndpointArn = "falseTargetEndpointArn";
+
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueSourceEndpointArn);
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueTargetEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseSourceEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseTargetEndpointArn);
+
+
+        dmsHook.setReplicationInstanceArn(replicationInstanceArn);
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertTrue(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+    }
+
+
+    @Test
+    public void testDescribeReplicationTasks() {
+
+        DmsHook dmsHook = new DmsHook();
+        dmsHook.setReplicationInstanceArn("arn:aws:dms:ap-southeast-1:123456789012:task:task_exist");
+
+        DescribeReplicationTasksResult describeReplicationTasksResult = mock(DescribeReplicationTasksResult.class);
+        when(client.describeReplicationTasks(any())).thenReturn(describeReplicationTasksResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getSourceEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source");
+        when(replicationTask.getTargetEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target");
+
+        when(describeReplicationTasksResult.getReplicationTasks()).thenReturn(Arrays.asList(replicationTask));
+
+        ReplicationTask replicationTaskOut = dmsHook.describeReplicationTasks();
+        Assert.assertNotEquals(dmsHook.getReplicationInstanceArn(), replicationTaskOut.getReplicationTaskArn());
+        Assert.assertEquals("task", replicationTaskOut.getReplicationTaskIdentifier());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source", replicationTaskOut.getSourceEndpointArn());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target", replicationTaskOut.getTargetEndpointArn());
+
+    }
+
+
+    @Test(timeout = 60000)
+    public void testAwaitReplicationTaskStatus() {
+
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        ReplicationTaskStats taskStats = mock(ReplicationTaskStats.class);
+        when(replicationTask.getReplicationTaskStats()).thenReturn(taskStats);
+        when(taskStats.getFullLoadProgressPercent()).thenReturn(100);
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertFalse(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED, DmsHook.STATUS.RUNNING));
+    }
+
+    @Test
+    public void testReplaceFileParameters() throws IOException {
+        String path = this.getClass().getResource("table_mapping.json").getPath();
+
+        String jsonData = loadJson("table_mapping.json");
+
+        DmsHook dmsHook = new DmsHook();
+
+        String pathParameter = "file://" + path;
+        Assert.assertEquals(jsonData, dmsHook.replaceFileParameters(pathParameter));
+
+//        String pathParameter2 = "file://" + "not_exist.json";
+//
+//        try {
+//            Assert.assertEquals(pathParameter2, dmsHook.replaceFileParameters(pathParameter2));
+//        }catch (Exception e) {
+//            Assert.assertTrue(e instanceof IOException);
+//        }
+
+        String pathParameter3 = "{}";
+        Assert.assertEquals(pathParameter3, dmsHook.replaceFileParameters(pathParameter3));
+
+    }
+
+//    this.getClass().getResourceAsStream("SagemakerRequestJson.json"))

Review Comment:
   ```suggestion
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenAnswer(invocation -> client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.READY);
+        when(createReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.createReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+        Assert.assertEquals("task", dmsHook.getReplicationTaskIdentifier());
+    }
+
+    @Test(timeout = 60000)
+    public void testStartReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+        StartReplicationTaskResult startReplicationTaskResult = mock(StartReplicationTaskResult.class);
+        when(client.startReplicationTask(any())).thenReturn(startReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.RUNNING);
+        when(startReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.startReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckFinishedReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.STOPPED);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        when(replicationTask.getStopReason()).thenReturn("*_FINISHED");
+        Assert.assertTrue(dmsHook.checkFinishedReplicationTask());
+
+        when(replicationTask.getStopReason()).thenReturn("*_ERROR");
+        Assert.assertFalse(dmsHook.checkFinishedReplicationTask());
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.DELETE);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.deleteReplicationTask());
+
+    }
+
+    @Test
+    public void testTestConnectionEndpoint() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        String replicationInstanceArn = "replicationInstanceArn";
+        String trueSourceEndpointArn = "trueSourceEndpointArn";
+        String trueTargetEndpointArn = "trueTargetEndpointArn";
+        String falseSourceEndpointArn = "falseSourceEndpointArn";
+        String falseTargetEndpointArn = "falseTargetEndpointArn";
+
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueSourceEndpointArn);
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueTargetEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseSourceEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseTargetEndpointArn);
+
+
+        dmsHook.setReplicationInstanceArn(replicationInstanceArn);
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertTrue(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+    }
+
+
+    @Test
+    public void testDescribeReplicationTasks() {
+
+        DmsHook dmsHook = new DmsHook();
+        dmsHook.setReplicationInstanceArn("arn:aws:dms:ap-southeast-1:123456789012:task:task_exist");
+
+        DescribeReplicationTasksResult describeReplicationTasksResult = mock(DescribeReplicationTasksResult.class);
+        when(client.describeReplicationTasks(any())).thenReturn(describeReplicationTasksResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getSourceEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source");
+        when(replicationTask.getTargetEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target");
+
+        when(describeReplicationTasksResult.getReplicationTasks()).thenReturn(Arrays.asList(replicationTask));
+
+        ReplicationTask replicationTaskOut = dmsHook.describeReplicationTasks();
+        Assert.assertNotEquals(dmsHook.getReplicationInstanceArn(), replicationTaskOut.getReplicationTaskArn());
+        Assert.assertEquals("task", replicationTaskOut.getReplicationTaskIdentifier());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source", replicationTaskOut.getSourceEndpointArn());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target", replicationTaskOut.getTargetEndpointArn());
+
+    }
+
+
+    @Test(timeout = 60000)
+    public void testAwaitReplicationTaskStatus() {
+
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        ReplicationTaskStats taskStats = mock(ReplicationTaskStats.class);
+        when(replicationTask.getReplicationTaskStats()).thenReturn(taskStats);
+        when(taskStats.getFullLoadProgressPercent()).thenReturn(100);
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertFalse(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED, DmsHook.STATUS.RUNNING));
+    }
+
+    @Test
+    public void testReplaceFileParameters() throws IOException {
+        String path = this.getClass().getResource("table_mapping.json").getPath();
+
+        String jsonData = loadJson("table_mapping.json");
+
+        DmsHook dmsHook = new DmsHook();
+
+        String pathParameter = "file://" + path;
+        Assert.assertEquals(jsonData, dmsHook.replaceFileParameters(pathParameter));
+
+//        String pathParameter2 = "file://" + "not_exist.json";
+//
+//        try {
+//            Assert.assertEquals(pathParameter2, dmsHook.replaceFileParameters(pathParameter2));
+//        }catch (Exception e) {
+//            Assert.assertTrue(e instanceof IOException);
+//        }

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org