You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/09/28 07:17:01 UTC

[GitHub] [dolphinscheduler] Tianqi-Dotes commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Tianqi-Dotes commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r982020166


##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data
+
+DolphinScheduler will track the status of the DMS task and set the status to successfully completed when the DMS task is completed. Except for the CDC task without end time.
+
+So, if the `migrationType` is `cdc` or `full-load-and-cdc`, `cdcStopPosition` not be set, DolphinScheduler will set the status to successfully after the DMS task start successfully.

Review Comment:
   after the DMS task starts successfully.



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data

Review Comment:
   JSON



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");

Review Comment:
   sagemaker



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");
+        }
+    }
+
+    public int checkCreateReplicationTask() throws TaskException {
+
+        // if IsRestartTask, return success, do not create replication task
+        if (parameters.getIsRestartTask()) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        }
+
+        // if not IsRestartTask, create replication task
+        Boolean isCreateSuccessfully;
+        try {
+            isCreateSuccessfully = dmsHook.createReplicationTask();
+        } catch (Exception e) {
+            throw new TaskException("DMS task create replication task error", e);
+        }
+
+        // if create replication task successfully, return EXIT_CODE_SUCCESS, else return EXIT_CODE_FAILURE
+        if (isCreateSuccessfully) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+    }
+
+    /**
+     * start replication task
+     *
+     * @return
+     * @throws TaskException
+     */
+    public int startReplicationTask() {
+
+        Boolean isStartSuccessfully = false;
+        try {
+            isStartSuccessfully = dmsHook.startReplicationTask();
+        } catch (InvalidResourceStateException e) {
+            logger.error("Failed to start a task, error message: {}", e.getErrorMessage());
+
+            // Only restart task when the error contains "Test connection", means instance can not connect to source or target
+            if (!e.getErrorMessage().contains("Test connection")) {
+                return TaskConstants.EXIT_CODE_FAILURE;
+            }
+
+            logger.info("restart replication task");
+            // if only restart task, run dmsHook.describeReplicationTasks to get replication task arn
+            if (parameters.getIsRestartTask()) {
+                dmsHook.describeReplicationTasks();
+            }
+
+            // test connection endpoint again and restart task if connection is ok
+            if (dmsHook.testConnectionEndpoint()) {
+                isStartSuccessfully = dmsHook.startReplicationTask();
+            }
+        }
+
+        // if start replication task failed, return EXIT_CODE_FAILURE
+        if (!isStartSuccessfully) {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+
+        return TaskConstants.EXIT_CODE_SUCCESS;
+    }
+
+    /**
+     * check if stop task when cdc
+     *
+     * @return true if stop task when cdc type and cdcStopPosition is not set, else return false
+     */
+    public Boolean isStopTaskWhenCdc() {
+        ReplicationTask replicationTask = dmsHook.describeReplicationTasks();
+        String migrationType = replicationTask.getMigrationType();
+        return migrationType.contains("cdc") && parameters.getCdcStopPosition() == null;
+    }
+
+    /**
+     * init dms hook
+     */
+    public void initDmsHook() throws TaskException {
+        convertJsonParameters();
+
+        dmsHook = new DmsHook();
+        try {
+            BeanUtils.copyProperties(dmsHook, parameters);
+        } catch (Exception e) {
+            throw new TaskException("DMS task init error", e);
+        }
+

Review Comment:
   format



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data
+
+DolphinScheduler will track the status of the DMS task and set the status to successfully completed when the DMS task is completed. Except for the CDC task without end time.
+
+So, if the `migrationType` is `cdc` or `full-load-and-cdc`, `cdcStopPosition` not be set, DolphinScheduler will set the status to successfully after the DMS task start successfully.
+
+## Create Task
+
+- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag <img src="../../../../img/tasks/icons/dms.png" width="15"/> from the toolbar to the canvas.
+
+## Task Example
+
+The task plugin picture is as follows
+
+**Create and start DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start.png)
+
+
+**Restart DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_restart.png)
+
+
+**Create and start DMS task by json data**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start_json.png)
+
+**Restart DMS task by json data**
+
+![dms](../../../../img/tasks/demo/dms_restart_json.png)
+
+
+
+### First, introduce some general parameters of DolphinScheduler
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
+  the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
+  to low, and tasks with the same priority will execute in a first-in first-out order.

Review Comment:
   first-in-first-out



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data

Review Comment:
   and the following json



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");

Review Comment:
   this else haven't start task yet



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {

Review Comment:
   format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org