You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/15 15:25:27 UTC

[incubator-inlong] branch release-1.1.0 updated: [INLONG-3737][Manager] Workflow should fail when the plugin occurred exception (#3742)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/release-1.1.0 by this push:
     new 71a13b499 [INLONG-3737][Manager] Workflow should fail when the plugin occurred exception (#3742)
71a13b499 is described below

commit 71a13b499afa927f588d16e0b87aa9443dadf408
Author: healchow <he...@gmail.com>
AuthorDate: Fri Apr 15 23:25:23 2022 +0800

    [INLONG-3737][Manager] Workflow should fail when the plugin occurred exception (#3742)
---
 .../inlong/manager/common/util/JsonUtils.java      |  31 +--
 .../manager/plugin/flink/FlinkOperation.java       | 228 ++++++++++++++++++++
 .../inlong/manager/plugin/flink/FlinkService.java  | 221 +++++++++-----------
 ...nTaskRunner.java => IntegrationTaskRunner.java} |  43 ++--
 .../manager/plugin/flink/ManagerFlinkTask.java     | 231 ---------------------
 .../manager/plugin/flink/dto/JarEntryInfo.java     |   2 +-
 .../{JarRunRequestbody.java => JarRunRequest.java} |   4 +-
 .../inlong/manager/plugin/flink/dto/LoginConf.java |   2 +-
 ...uestBody.java => StopWithSavepointRequest.java} |   2 +-
 .../plugin/flink/enums/BusinessExceptionDesc.java  |  37 ----
 .../plugin/flink/{ => enums}/Constants.java        |   2 +-
 .../plugin/listener/DeleteSortListener.java        |  67 +++---
 .../plugin/listener/RestartSortListener.java       |  89 ++++----
 .../plugin/listener/StartupSortListener.java       | 120 +++++------
 .../plugin/listener/SuspendSortListener.java       |  66 +++---
 .../manager/plugin/util/FlinkConfiguration.java    |  82 ++++----
 .../inlong/manager/plugin/util/FlinkUtils.java     |  39 +---
 .../plugin/listener/DeleteSortListenerTest.java    |   2 +-
 .../workflow/util/WorkflowFormParserUtils.java     |   4 +-
 19 files changed, 606 insertions(+), 666 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
index 1d0dbb269..5aa6bccb7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
@@ -26,25 +26,26 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.exceptions.JsonException;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 /**
  * JSON utils
  */
 @Slf4j
 public class JsonUtils {
 
-    public static final ObjectMapper MAPPER = new ObjectMapper();
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     static {
-        MAPPER.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
-        MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
-        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
+        OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+        OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     }
 
     /**
@@ -61,7 +62,7 @@ public class JsonUtils {
             return (String) obj;
         }
         try {
-            return MAPPER.writeValueAsString(obj);
+            return OBJECT_MAPPER.writeValueAsString(obj);
         } catch (JsonProcessingException e) {
             log.error("JSON transform error: {}", obj, e);
             throw new JsonException("JSON transform error");
@@ -78,7 +79,7 @@ public class JsonUtils {
      */
     public static <T> T parse(String json, TypeReference<T> type) {
         try {
-            return MAPPER.readValue(json, type);
+            return OBJECT_MAPPER.readValue(json, type);
         } catch (IOException e) {
             log.error("JSON transform error: {}", json, e);
             throw new JsonException("JSON transform error");
@@ -96,7 +97,7 @@ public class JsonUtils {
             return null;
         }
         try {
-            return MAPPER.readTree(json);
+            return OBJECT_MAPPER.readTree(json);
         } catch (IOException e) {
             log.error("JSON transform error: {}", json, e);
             throw new JsonException("JSON transform error");
@@ -113,7 +114,7 @@ public class JsonUtils {
      */
     public static <T> T parse(String json, Class<T> tClass) {
         try {
-            return MAPPER.readValue(json, tClass);
+            return OBJECT_MAPPER.readValue(json, tClass);
         } catch (IOException e) {
             log.error("JSON transform error: {}", json, e);
             throw new JsonException("JSON transform error");
@@ -122,7 +123,7 @@ public class JsonUtils {
 
     public static <T> T parse(String json, JavaType javaType) {
         try {
-            return MAPPER.readValue(json, javaType);
+            return OBJECT_MAPPER.readValue(json, javaType);
         } catch (IOException e) {
             log.error("JSON transform error: {}", json, e);
             throw new JsonException("JSON transform error");
@@ -139,7 +140,8 @@ public class JsonUtils {
      */
     public static <E> List<E> parseList(String json, Class<E> eClass) {
         try {
-            return MAPPER.readValue(json, MAPPER.getTypeFactory().constructCollectionType(List.class, eClass));
+            return OBJECT_MAPPER.readValue(json,
+                    OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, eClass));
         } catch (IOException e) {
             log.error("JSON transform error: {}", json, e);
             throw new JsonException("JSON transform error");
@@ -158,7 +160,8 @@ public class JsonUtils {
      */
     public static <K, V> Map<K, V> parseMap(String json, Class<K> kClass, Class<V> vClass) {
         try {
-            return MAPPER.readValue(json, MAPPER.getTypeFactory().constructMapType(Map.class, kClass, vClass));
+            return OBJECT_MAPPER.readValue(json,
+                    OBJECT_MAPPER.getTypeFactory().constructMapType(Map.class, kClass, vClass));
         } catch (IOException e) {
             log.error("JSON transform error: {}", json, e);
             throw new JsonException("JSON transform error");
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
new file mode 100644
index 000000000..c061f5590
--- /dev/null
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
+import org.apache.inlong.manager.plugin.util.FlinkUtils;
+
+import java.io.File;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.inlong.manager.plugin.util.FlinkUtils.findFiles;
+
+/**
+ * Flink task operation.
+ */
+@Slf4j
+public class FlinkOperation {
+
+    private static final String JOB_TERMINATED_MSG = "the job not found by id %s, "
+            + "or task already terminated or savepoint path is null";
+    private static final String INLONG_MANAGER = "inlong-manager";
+    private static final String INLONG_SORT = "inlong-sort";
+    private static final String SORT_JAR_PATTERN = "^sort-single-tenant.*jar$";
+
+    private final FlinkService flinkService;
+
+    public FlinkOperation(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    /**
+     * Start the Flink job, if the job id was not empty, restore it.
+     */
+    public void start(FlinkInfo flinkInfo) throws Exception {
+        String jobId = flinkInfo.getJobId();
+        try {
+            // Start a new task without savepoint
+            if (StringUtils.isEmpty(jobId)) {
+                IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo,
+                        TaskCommitType.START_NOW.getCode());
+                Future<?> future = TaskRunService.submit(taskRunner);
+                future.get();
+            } else {
+                // Restore an old task with savepoint
+                boolean noSavepoint = isNullOrTerminated(jobId) || StringUtils.isEmpty(flinkInfo.getSavepointPath());
+                if (noSavepoint) {
+                    String message = String.format("restore job failed, as " + JOB_TERMINATED_MSG, jobId);
+                    log.error(message);
+                    throw new Exception(message);
+                }
+
+                IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo,
+                        TaskCommitType.RESUME.getCode());
+                Future<?> future = TaskRunService.submit(taskRunner);
+                future.get();
+            }
+        } catch (Exception e) {
+            log.warn("submit flink job failed for {}", flinkInfo, e);
+            throw new Exception("submit flink job failed: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Build Flink local path.
+     */
+    public void genPath(FlinkInfo flinkInfo, String dataflow) throws Exception {
+        String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+        log.info("gen path from {}", path);
+
+        int index = path.indexOf(INLONG_MANAGER);
+        if (index == -1) {
+            throw new Exception(INLONG_MANAGER + " path not found in " + path);
+        }
+
+        path = path.substring(0, path.lastIndexOf(File.separator));
+        String startPath = path.substring(0, index);
+        String basePath = startPath + INLONG_SORT;
+        File file = new File(basePath);
+        if (!file.exists()) {
+            String message = String.format("file path [%s] not found", basePath);
+            log.error(message);
+            throw new Exception(message);
+        }
+
+        String jarPath = findFiles(basePath, SORT_JAR_PATTERN);
+        flinkInfo.setLocalJarPath(jarPath);
+        log.info("get sort jar path success, path: {}", jarPath);
+
+        if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(), dataflow)) {
+            flinkInfo.setLocalConfPath(path + File.separator + flinkInfo.getJobName());
+        } else {
+            String message = String.format("write dataflow to %s failed", path);
+            log.error(message + ", dataflow: {}", dataflow);
+            throw new Exception(message);
+        }
+    }
+
+    /**
+     * Restart the Flink job.
+     */
+    public void restart(FlinkInfo flinkInfo) throws Exception {
+        String jobId = flinkInfo.getJobId();
+        boolean terminated = isNullOrTerminated(jobId);
+        if (terminated) {
+            String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId);
+            log.error(message);
+            throw new Exception(message);
+        }
+
+        Future<?> future = TaskRunService.submit(new IntegrationTaskRunner(flinkService, flinkInfo,
+                TaskCommitType.RESTART.getCode()));
+        future.get();
+    }
+
+    /**
+     * Stop the Flink job.
+     */
+    public void stop(FlinkInfo flinkInfo) throws Exception {
+        String jobId = flinkInfo.getJobId();
+        boolean terminated = isNullOrTerminated(jobId);
+        if (terminated) {
+            String message = String.format("stop job failed, as " + JOB_TERMINATED_MSG, jobId);
+            log.error(message);
+            throw new Exception(message);
+        }
+
+        Future<?> future = TaskRunService.submit(
+                new IntegrationTaskRunner(flinkService, flinkInfo,
+                        TaskCommitType.STOP.getCode()));
+        future.get();
+    }
+
+    public void delete(FlinkInfo flinkInfo) throws Exception {
+        String jobId = flinkInfo.getJobId();
+        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+        if (jobDetailsInfo == null) {
+            throw new Exception(String.format("delete job failed as the job not found for %s", jobId));
+        }
+
+        JobStatus jobStatus = jobDetailsInfo.getJobStatus();
+        if (jobStatus != null && jobStatus.isTerminalState()) {
+            String message = String.format("not support delete %s as the task was terminated", jobId);
+            message = jobStatus.isGloballyTerminalState() ? message + " globally" : " locally";
+            throw new Exception(message);
+        }
+
+        Future<?> future = TaskRunService.submit(
+                new IntegrationTaskRunner(flinkService, flinkInfo,
+                        TaskCommitType.DELETE.getCode()));
+        future.get();
+    }
+
+    public void pollJobStatus(FlinkInfo flinkInfo) throws Exception {
+        if (flinkInfo.isException()) {
+            throw new BusinessException("startup failed: " + flinkInfo.getExceptionMsg());
+        }
+        String jobId = flinkInfo.getJobId();
+        if (StringUtils.isBlank(jobId)) {
+            log.error("job id cannot empty for {}", flinkInfo);
+            throw new Exception("job id cannot empty");
+        }
+
+        while (true) {
+            try {
+                JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+                if (jobDetailsInfo == null) {
+                    log.error("job detail not found by {}", jobId);
+                    throw new Exception(String.format("job detail not found by %s", jobId));
+                }
+
+                JobStatus jobStatus = jobDetailsInfo.getJobStatus();
+                if (jobStatus.isTerminalState()) {
+                    log.error("job was terminated for {}, exception: {}", jobId, flinkInfo.getExceptionMsg());
+                    throw new Exception("job was terminated for " + jobId);
+                }
+
+                if (jobStatus == RUNNING) {
+                    log.info("job status is Running for {}", jobDetailsInfo);
+                    break;
+                }
+                log.info("job was not Running for {}", jobDetailsInfo);
+                TimeUnit.SECONDS.sleep(5000);
+            } catch (Exception e) {
+                log.error("poll job status error for {}, exception: ", flinkInfo, e);
+            }
+        }
+    }
+
+    /**
+     * Check whether the job was terminated by the given job id.
+     */
+    private boolean isNullOrTerminated(String jobId) throws Exception {
+        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+        boolean terminated = jobDetailsInfo == null || jobDetailsInfo.getJobStatus() == null;
+        if (terminated) {
+            log.warn("job detail or job status was null for [{}]", jobId);
+            return terminated;
+        }
+
+        terminated = jobDetailsInfo.getJobStatus().isTerminalState();
+        log.warn("job terminated state was [{}] for [{}]", terminated, jobDetailsInfo);
+        return terminated;
+    }
+
+}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 6dbe835e0..61c8d1c7d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -31,14 +31,15 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequestBody;
+import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -50,219 +51,191 @@ import java.util.regex.Pattern;
 @Slf4j
 public class FlinkService {
 
-    private static final Pattern numberPattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+)\\:(\\d+)");
+    private static final Pattern IP_PORT_PATTERN = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
 
     private final FlinkConfig flinkConfig;
-    private final Integer port;
-    private final Integer jobManagerPort;
-    private final String  address;
     private final Integer parallelism;
     private final String savepointDirectory;
+    private final Configuration configuration;
 
-    public FlinkService(String endpoint) throws IOException {
+    public FlinkService(String endpoint) throws Exception {
         FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
         flinkConfig = flinkConfiguration.getFlinkConfig();
-        jobManagerPort = flinkConfig.getJobManagerPort();
         parallelism = flinkConfig.getParallelism();
         savepointDirectory = flinkConfig.getSavepointDirectory();
+
+        configuration = new Configuration();
+        Integer jobManagerPort = flinkConfig.getJobManagerPort();
+        configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+        Integer port;
+        String address;
         if (StringUtils.isEmpty(endpoint)) {
             address = flinkConfig.getAddress();
             port = flinkConfig.getPort();
         } else {
-            address = translateFromEndpont(endpoint).get("address");
-            port = Integer.valueOf(translateFromEndpont(endpoint).get("port"));
+            Map<String, String> ipPort = translateFromEndpoint(endpoint);
+            if (ipPort.isEmpty()) {
+                throw new BusinessException("get address:port failed from endpoint " + endpoint);
+            }
+            address = ipPort.get("address");
+            port = Integer.valueOf(ipPort.get("port"));
         }
+        configuration.setString(JobManagerOptions.ADDRESS, address);
+        configuration.setInteger(RestOptions.PORT, port);
     }
 
     /**
-     * translate  Endpont to address & port
-     *
-     * @param endpoint
-     * @return
+     * Translate the Endpoint to address & port
      */
-    private Map<String, String> translateFromEndpont(String endpoint) {
+    private Map<String, String> translateFromEndpoint(String endpoint) throws Exception {
         Map<String, String> map = new HashMap<>(2);
-        try {
-            Matcher matcher = numberPattern.matcher(endpoint);
-            while (matcher.find()) {
-                map.put("address", matcher.group(1));
-                map.put("port", matcher.group(2));
-                return map;
-            }
-        } catch (Exception e) {
-            log.error("fetch address:port fail: ", e);
+        Matcher matcher = IP_PORT_PATTERN.matcher(endpoint);
+        if (matcher.find()) {
+            map.put("address", matcher.group(1));
+            map.put("port", matcher.group(2));
+            return map;
+        } else {
+            throw new Exception("endpoint [" + endpoint + "] was not match address:port");
         }
-        return map;
     }
 
     /**
-     * get flinkConfig
-     * @return
+     * Get Flink config.
      */
     public FlinkConfig getFlinkConfig() {
         return flinkConfig;
     }
 
     /**
-     * get flink Client
-     * @return
-     * @throws Exception
+     * Get the Flink Client.
      */
     public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception {
-        Configuration configuration = initConfiguration();
-        RestClusterClient<StandaloneClusterId> client =
-                new RestClusterClient<StandaloneClusterId>(configuration, StandaloneClusterId.getInstance());
-        return client;
-
-    }
-
-    /**
-     * init flink-client Configuration
-     * @return
-     */
-    public Configuration initConfiguration()  {
-        Configuration configuration = new Configuration();
-        configuration.setString(JobManagerOptions.ADDRESS, address);
-        configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
-        configuration.setInteger(RestOptions.PORT, port);
-        return configuration;
-
+        try {
+            return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            log.error("get flink client failed: ", e);
+            throw new Exception("get flink client failed: " + e.getMessage());
+        }
     }
 
     /**
-     * get job status
-     * @return
+     * Get the job status by the given job id.
      */
-    public JobStatus getJobStatus(String jobId) {
+    public JobStatus getJobStatus(String jobId) throws Exception {
         try {
             RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
             CompletableFuture<JobStatus> jobStatus = client.getJobStatus(jobID);
             return jobStatus.get();
         } catch (Exception e) {
-            log.error("get job status error: ", e);
+            log.error("get job status by jobId={} failed: ", jobId, e);
+            throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage());
         }
-        return null;
     }
 
     /**
-     * get job detail
-     * @return
+     * Get job detail by the given job id.
      */
-    public JobDetailsInfo getJobDetail(String jobId) {
+    public JobDetailsInfo getJobDetail(String jobId) throws Exception {
         try {
             RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
             CompletableFuture<JobDetailsInfo> jobDetails = client.getJobDetails(jobID);
             return jobDetails.get();
         } catch (Exception e) {
-            log.error("get job detail error: ", e);
+            log.error("get job detail by jobId={} failed: ", jobId, e);
+            throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage());
         }
-        return null;
     }
 
     /**
-     * submit job
-     * @param flinkInfo
+     * Submit the Flink job.
      */
-    public String submitJobs(FlinkInfo flinkInfo) {
-        RestClusterClient<StandaloneClusterId> client = null;
-        String localJarPath = flinkInfo.getLocalJarPath();
-        String[] programArgs = genProgramArgs(flinkInfo);
+    public String submit(FlinkInfo flinkInfo) throws Exception {
         try {
-            client = getFlinkClient();
-            Configuration configuration = initConfiguration();
-            File jarFile = new File(localJarPath);
-                SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
-                PackagedProgram program = PackagedProgram.newBuilder()
-                        .setConfiguration(configuration)
-                        .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
-                        .setJarFile(jarFile)
-                        .setArguments(programArgs)
-                        .setSavepointRestoreSettings(savepointRestoreSettings).build();
-                JobGraph jobGraph =
-                        PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
-                CompletableFuture<JobID> result = client.submitJob(jobGraph);
-                return result.get().toString();
+            SavepointRestoreSettings settings = SavepointRestoreSettings.none();
+            return submitJobBySavepoint(flinkInfo, settings);
         } catch (Exception e) {
-            log.error("submit job error: ", e);
+            log.error("submit job from info {} failed: ", flinkInfo, e);
+            throw new Exception("submit job failed: " + e.getMessage());
         }
-        return null;
     }
 
     /**
-     * restore job with savepoint
-     * @param flinkInfo
+     * Restore the Flink job.
      */
-    public String restore(FlinkInfo flinkInfo) {
-        RestClusterClient<StandaloneClusterId> client = null;
-        String localJarPath = flinkInfo.getLocalJarPath();
-        String[] programArgs = genProgramArgs(flinkInfo);
+    public String restore(FlinkInfo flinkInfo) throws Exception {
         try {
-            client = getFlinkClient();
-            Configuration configuration = initConfiguration();
-            File jarFile = new File(localJarPath);
             if (StringUtils.isNotEmpty(flinkInfo.getSavepointPath())) {
-                SavepointRestoreSettings savepointRestoreSettings =
-                        SavepointRestoreSettings.forPath(savepointDirectory,false);
-                PackagedProgram program = PackagedProgram.newBuilder()
-                        .setConfiguration(configuration)
-                        .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
-                        .setJarFile(jarFile)
-                        .setArguments(programArgs)
-                        .setSavepointRestoreSettings(savepointRestoreSettings).build();
-                JobGraph jobGraph =
-                        PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
-                CompletableFuture<JobID> result = client.submitJob(jobGraph);
-                return result.get().toString();
+                SavepointRestoreSettings settings = SavepointRestoreSettings.forPath(savepointDirectory, false);
+                return submitJobBySavepoint(flinkInfo, settings);
+            } else {
+                log.warn("skip to restore as the savepoint path was empty " + flinkInfo);
+                return null;
             }
         } catch (Exception e) {
-            log.error("restore job error: ", e);
+            log.error("restore job from info {} failed: ", flinkInfo, e);
+            throw new Exception("restore job failed: " + e.getMessage());
         }
-        return null;
     }
 
     /**
-     * stop job with savepoint
-     * @param jobId
-     * @param requestBody
-     * @return
+     * Submit the job with the savepoint settings.
      */
-    public String stopJobs(String jobId, StopWithSavepointRequestBody requestBody) {
+    private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
+        String localJarPath = flinkInfo.getLocalJarPath();
+        File jarFile = new File(localJarPath);
+        String[] programArgs = genProgramArgs(flinkInfo);
+
+        PackagedProgram program = PackagedProgram.newBuilder()
+                .setConfiguration(configuration)
+                .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
+                .setJarFile(jarFile)
+                .setArguments(programArgs)
+                .setSavepointRestoreSettings(settings).build();
+        JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false);
+
+        RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+        CompletableFuture<JobID> result = client.submitJob(jobGraph);
+        return result.get().toString();
+    }
+
+    /**
+     * Stop the Flink job with the savepoint.
+     */
+    public String stopJob(String jobId, StopWithSavepointRequest request) throws Exception {
         try {
             RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<String> stopResult =
-                    client.stopWithSavepoint(jobID,requestBody.isDrain(),requestBody.getTargetDirectory());
+            CompletableFuture<String> stopResult = client.stopWithSavepoint(jobID, request.isDrain(),
+                    request.getTargetDirectory());
             return stopResult.get();
         } catch (Exception e) {
-            log.error("stop job error: ", e);
+            log.error("stop job {} and request {} failed: ", jobId, request, e);
+            throw new Exception("stop job " + jobId + " failed: " + e.getMessage());
         }
-        return null;
     }
 
     /**
-     * cancel job
-     * @param jobId
-     * @return
+     * Cancel the Flink job.
      */
-    public void cancelJobs(String jobId) {
+    public void cancelJob(String jobId) throws Exception {
         try {
             RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
             client.cancel(jobID);
         } catch (Exception e) {
-            log.error("cancel job error: ", e);
+            log.error("cancel job {} failed: ", jobId, e);
+            throw new Exception("cancel job " + jobId + " failed: " + e.getMessage());
         }
     }
 
     /**
-     * build the program of job
-     * @param flinkInfo
-     * @return
+     * Build the program of the Flink job.
      */
     private String[] genProgramArgs(FlinkInfo flinkInfo) {
-        List<String> list =  new ArrayList<>();
+        List<String> list = new ArrayList<>();
         list.add("-cluster-id");
         list.add(flinkInfo.getJobName());
         list.add("-dataflow.info.file");
@@ -271,14 +244,14 @@ public class FlinkService {
         list.add(flinkInfo.getSourceType());
         list.add("-sink.type");
         list.add(flinkInfo.getSinkType());
-        // one group one stream now
+        // TODO Support more than one stream with one group
         if (flinkInfo.getInlongStreamInfoList() != null
                 && !flinkInfo.getInlongStreamInfoList().isEmpty()) {
             InlongStreamInfo inlongStreamInfo = flinkInfo.getInlongStreamInfoList().get(0);
             list.add("-job.orderly.output");
             list.add(String.valueOf(inlongStreamInfo.getSyncSend()));
         }
-        String[] data = list.toArray(new String[list.size()]);
-        return data;
+        return list.toArray(new String[0]);
     }
+
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
similarity index 80%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
index 7399933de..1350a7276 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
@@ -21,37 +21,28 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequestBody;
+import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
 
 import static org.apache.flink.api.common.JobStatus.FINISHED;
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 @Slf4j
-public class IntergrationTaskRunner implements Runnable {
+public class IntegrationTaskRunner implements Runnable {
 
-    private FlinkService flinkService;
-    private FlinkInfo flinkInfo;
-    private Integer commitType;
     private static final Integer TRY_MAX_TIMES = 60;
     private static final Integer INTERVAL = 10;
+    private final FlinkService flinkService;
+    private final FlinkInfo flinkInfo;
+    private final Integer commitType;
 
-    public IntergrationTaskRunner(FlinkService flinkService, FlinkInfo flinkInfo,Integer commitType) {
+    public IntegrationTaskRunner(FlinkService flinkService, FlinkInfo flinkInfo, Integer commitType) {
         this.flinkService = flinkService;
         this.flinkInfo = flinkInfo;
         this.commitType = commitType;
     }
 
-    /**
-     * When an object implementing interface <code>Runnable</code> is used
-     * to create a thread, starting the thread causes the object's
-     * <code>run</code> method to be called in that separately executing
-     * thread.
-     * <p>The general contract of the method <code>run</code> is that it may
-     * take any action whatsoever.
-     *
-     * @see Thread#run()
-     */
     @Override
     public void run() {
         TaskCommitType commitType = TaskCommitType.getInstance(this.commitType);
@@ -61,7 +52,7 @@ public class IntergrationTaskRunner implements Runnable {
         switch (commitType) {
             case START_NOW:
                 try {
-                    String jobId = flinkService.submitJobs(flinkInfo);
+                    String jobId = flinkService.submit(flinkInfo);
                     flinkInfo.setJobId(jobId);
                     log.info("Start job {} success in backend", jobId);
                 } catch (Exception e) {
@@ -86,10 +77,10 @@ public class IntergrationTaskRunner implements Runnable {
                 break;
             case RESTART:
                 try {
-                    StopWithSavepointRequestBody stopWithSavepointRequestBody = new StopWithSavepointRequestBody();
-                    stopWithSavepointRequestBody.setDrain(Constants.DRAIN);
-                    stopWithSavepointRequestBody.setTargetDirectory(Constants.SAVEPOINT_DIRECTORY);
-                    String location = flinkService.stopJobs(flinkInfo.getJobId(), stopWithSavepointRequestBody);
+                    StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
+                    stopWithSavepointRequest.setDrain(Constants.DRAIN);
+                    stopWithSavepointRequest.setTargetDirectory(Constants.SAVEPOINT_DIRECTORY);
+                    String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
                     flinkInfo.setSavepointPath(location);
                     log.info("the jobId: {} savepoint: {} ", flinkInfo.getJobId(), location);
                     int times = 0;
@@ -124,11 +115,11 @@ public class IntergrationTaskRunner implements Runnable {
                 break;
             case STOP:
                 try {
-                    StopWithSavepointRequestBody stopWithSavepointRequestBody = new StopWithSavepointRequestBody();
-                    stopWithSavepointRequestBody.setDrain(Constants.DRAIN);
+                    StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
+                    stopWithSavepointRequest.setDrain(Constants.DRAIN);
                     FlinkConfig flinkConfig = flinkService.getFlinkConfig();
-                    stopWithSavepointRequestBody.setTargetDirectory(flinkConfig.getSavepointDirectory());
-                    String location = flinkService.stopJobs(flinkInfo.getJobId(), stopWithSavepointRequestBody);
+                    stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
+                    String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
                     flinkInfo.setSavepointPath(location);
                     log.info("the jobId {} savepoint: {} ", flinkInfo.getJobId(), location);
                 } catch (Exception e) {
@@ -141,7 +132,7 @@ public class IntergrationTaskRunner implements Runnable {
                 break;
             case DELETE:
                 try {
-                    flinkService.cancelJobs(flinkInfo.getJobId());
+                    flinkService.cancelJob(flinkInfo.getJobId());
                     log.info("delete job {} success in backend", flinkInfo.getJobId());
                     JobStatus jobStatus = flinkService.getJobStatus(flinkInfo.getJobId());
                     if (jobStatus.isTerminalState()) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/ManagerFlinkTask.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/ManagerFlinkTask.java
deleted file mode 100644
index 4b9fcdc69..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/ManagerFlinkTask.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.flink;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.BusinessExceptionDesc;
-import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
-import org.apache.inlong.manager.plugin.util.FlinkUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.api.common.JobStatus.RUNNING;
-import static org.apache.inlong.manager.plugin.util.FlinkUtils.findFiles;
-
-/**
- * flink operation
- */
-@Slf4j
-public class ManagerFlinkTask {
-
-    private FlinkService flinkService;
-
-    public ManagerFlinkTask(FlinkService flinkService) {
-        this.flinkService = flinkService;
-    }
-
-    public void start(FlinkInfo flinkInfo) throws IOException {
-        String jobId = flinkInfo.getJobId();
-        //Start a new task without savepoint
-        if (StringUtils.isEmpty(jobId)) {
-            try {
-                Future<?> future = TaskRunService.submit(
-                        new IntergrationTaskRunner(flinkService, flinkInfo,
-                                TaskCommitType.START_NOW.getCode()));
-                future.get();
-            } catch (Exception e) {
-                log.warn("Flink job some exception [{}]", e);
-                throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
-                        + e.getMessage());
-            }
-            //Restore an old task with savepoint
-            } else {
-            JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
-            if (jobDetailsInfo == null) {
-                throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
-                        + String.format("Flink job %s not found", flinkInfo.getJobId()));
-            }
-            JobStatus jobStatus = jobDetailsInfo.getJobStatus();
-           if (!jobStatus.isTerminalState() && StringUtils.isNotEmpty(flinkInfo.getSavepointPath())) {
-               try {
-                   Future<?> future = TaskRunService.submit(
-                           new IntergrationTaskRunner(flinkService, flinkInfo,
-                                   TaskCommitType.RESUME.getCode()));
-                   future.get();
-               } catch (Exception e) {
-                   log.warn("Flink job some exception [{}]", e);
-                   throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
-                           + e.getMessage());
-               }
-           } else {
-               throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
-                       + "not support resume when task has been terminaled or SavepointPath is null ");
-           }
-        }
-    }
-
-    /**
-     * genPath
-     * @param flinkInfo
-     * @param dataflow
-     */
-    public void genPath(FlinkInfo flinkInfo, String dataflow) {
-        String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-        path = path.substring(0, path.lastIndexOf(File.separator));
-        if (path.contains("inlong-manager")) {
-            String startPath = path.substring(0, path.indexOf("inlong-manager"));
-            String resource = "inlong-sort";
-            String basePath = startPath  + resource;
-            File file = new File(basePath);
-            if (!file.exists()) {
-                log.warn("file path:[{}] not found sort jar", basePath);
-                throw new BusinessException(BusinessExceptionDesc.InternalError + " not found inlong-sort");
-            }
-            String jarPath = findFiles(basePath,"^sort-single-tenant.*jar$");
-            log.info("sort-single-tenant path :{}",jarPath);
-            flinkInfo.setLocalJarPath(jarPath);
-        } else {
-            throw new BusinessException(BusinessExceptionDesc.InternalError + " inlong-manager dic not found");
-        }
-        if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(), dataflow)) {
-            flinkInfo.setLocalConfPath(path + File.separator + flinkInfo.getJobName());
-        } else {
-            throw new BusinessException(BusinessExceptionDesc.InternalError + " write file fail");
-        }
-    }
-
-    /**
-     * restart flinkjob
-     * @param flinkInfo
-     * @throws Exception
-     * @throws IOException
-     */
-    public void restart(FlinkInfo flinkInfo) throws Exception, IOException {
-        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
-        if (jobDetailsInfo == null) {
-            throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
-                    + String.format("Flink job %s not found", flinkInfo.getJobId()));
-        }
-        JobStatus jobStatus = jobDetailsInfo.getJobStatus();
-        if (!jobStatus.isTerminalState()) {
-            Future<?> future = TaskRunService.submit(
-                    new IntergrationTaskRunner(flinkService, flinkInfo,
-                            TaskCommitType.RESTART.getCode()));
-            future.get();
-        } else {
-            throw new BusinessException(BusinessExceptionDesc.FailedOperation.getMessage()
-                    + String.format("Flink job %s restart fail", flinkInfo.getJobId()));
-        }
-    }
-
-    /**
-     * stop flinkjob
-     * @param flinkInfo
-     * @throws Exception
-     */
-    public void stop(FlinkInfo flinkInfo) throws Exception {
-        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
-        if (jobDetailsInfo == null) {
-            throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
-                    + String.format("Flink job %s not found", flinkInfo.getJobId()));
-        }
-        JobStatus jobStatus = jobDetailsInfo.getJobStatus();
-        if (!jobStatus.isTerminalState()) {
-            Future<?> future = TaskRunService.submit(
-                    new IntergrationTaskRunner(flinkService, flinkInfo,
-                            TaskCommitType.STOP.getCode()));
-            future.get();
-        } else {
-            throw new BusinessException(BusinessExceptionDesc.FailedOperation.getMessage()
-                    + String.format("Flink job %s stop fail", flinkInfo.getJobId()));
-        }
-    }
-
-    /**
-     * delete flinkjob
-     * @param flinkInfo
-     * @throws Exception
-     */
-    public void delete(FlinkInfo flinkInfo) throws Exception {
-        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
-        if (jobDetailsInfo == null) {
-            throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
-                    + String.format("Flink job %s not found", flinkInfo.getJobId()));
-        }
-        JobStatus jobStatus = jobDetailsInfo.getJobStatus();
-        if (jobStatus.isTerminalState()) {
-            if (jobStatus.isGloballyTerminalState()) {
-                throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
-                        + "not support delete when task has been terminaled globally");
-            } else {
-                throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
-                        + "not support delete when task has been terminaled locally");
-            }
-        } else {
-            Future<?> future = TaskRunService.submit(
-                    new IntergrationTaskRunner(flinkService, flinkInfo,
-                            TaskCommitType.DELETE.getCode()));
-            future.get();
-        }
-    }
-
-    /**
-     * poll status
-     * @param flinkInfo
-     * @throws InterruptedException
-     */
-    public void pollFlinkStatus(FlinkInfo flinkInfo) throws InterruptedException {
-        if (flinkInfo.isException()) {
-            throw new BusinessException("startup fail reason:" + flinkInfo.getExceptionMsg());
-        }
-        TimeUnit.SECONDS.sleep(5);
-        while (true) {
-            if (StringUtils.isNotEmpty(flinkInfo.getJobId())) {
-                JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
-                if (jobDetailsInfo == null) {
-                    throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
-                            + String.format("Flink job %s not found", flinkInfo.getJobId()));
-                }
-                JobStatus jobStatus = jobDetailsInfo.getJobStatus();
-
-                if (jobStatus.isTerminalState()) {
-                    log.warn("flink job fail for status [{}]", jobStatus);
-                    throw new BusinessException(
-                            "startup fail " + jobStatus + flinkInfo.getExceptionMsg());
-                } else {
-                    if (jobStatus == RUNNING) {
-                        log.info("Flink status is Running");
-                        break;
-                    } else {
-                            log.info("poll Flink status");
-                            TimeUnit.SECONDS.sleep(5);
-                            continue;
-                        }
-                    }
-                }
-            }
-        }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
index 277addcf3..aedf0128b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
@@ -25,7 +25,7 @@ public class JarEntryInfo {
     /**
      * description
      */
-    private String deacription;
+    private String description;
 
     /**
      * name
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequestbody.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
similarity index 93%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequestbody.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
index 3f0729b32..ab071399b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequestbody.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
@@ -22,9 +22,9 @@ import lombok.Data;
 import java.util.List;
 
 @Data
-public class JarRunRequestbody {
+public class JarRunRequest {
 
-    private  Boolean allowNonRestoredState;
+    private Boolean allowNonRestoredState;
 
     private String entryClass;
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
index cfbe8d083..868d84b7d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
@@ -33,7 +33,7 @@ public class LoginConf {
     private Integer restPort;
 
     /**
-     * jobmanager port
+     * job manager port
      */
     private Integer jobManagerPort;
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequestBody.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
similarity index 95%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequestBody.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
index 3e51ee0c6..13808023c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequestBody.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
@@ -22,7 +22,7 @@ import lombok.Setter;
 
 @Getter
 @Setter
-public class StopWithSavepointRequestBody {
+public class StopWithSavepointRequest {
 
     /**
      * tag
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/BusinessExceptionDesc.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/BusinessExceptionDesc.java
deleted file mode 100644
index 59e3f2b31..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/BusinessExceptionDesc.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.flink.enums;
-
-public enum BusinessExceptionDesc {
-
-    ResourceNotFound("ResourceNotFound"),
-    InternalError("InternalError"),
-    FailedOperation("FailedOperation"),
-    UnsupportedOperation("UnsupportedOperation"),
-    ;
-
-    private final String message;
-
-    BusinessExceptionDesc(String message) {
-        this.message = message;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/Constants.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
similarity index 97%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/Constants.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 5db9084da..0a44c2bd8 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/Constants.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.plugin.flink;
+package org.apache.inlong.manager.plugin.flink.enums;
 
 public class Constants {
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index a6924ff2b..c7b1107d7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -18,16 +18,16 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -42,6 +42,7 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 
 @Slf4j
 public class DeleteSortListener implements SortOperateListener {
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -49,47 +50,59 @@ public class DeleteSortListener implements SortOperateListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        String inlongGroupId = context.getProcessForm().getInlongGroupId();
-        ObjectMapper objectMapper = new ObjectMapper();
-        UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) context.getProcessForm();
-        InlongGroupInfo inlongGroupInfo = updateGroupProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> inlongGroupExtInfoList = inlongGroupInfo.getExtList();
-        log.info("inlongGroupExtInfoList:{}", inlongGroupExtInfoList);
-        Map<String, String> kvConf =
-                inlongGroupExtInfoList.stream().collect(Collectors.toMap(InlongGroupExtInfo::getKeyName,
-                InlongGroupExtInfo::getKeyValue));
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof UpdateGroupProcessForm)) {
+            String message = String.format("process form was not UpdateGroup for groupId [%s]", groupId);
+            log.error(message);
+            return ListenerResult.fail(message);
+        }
+
+        UpdateGroupProcessForm updateGroupForm = (UpdateGroupProcessForm) processForm;
+        InlongGroupInfo inlongGroupInfo = updateGroupForm.getGroupInfo();
+        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+        log.info("inlong group ext info: {}", extList);
+
+        Map<String, String> kvConf = extList.stream().collect(
+                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("groupId [%s] not add deleteProcess listener, "
-                    + "as the sortProperties is empty", inlongGroupId);
-            log.warn(message);
+            String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
+                    groupId);
+            log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
-                new TypeReference<Map<String, String>>(){});
+
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+                new TypeReference<Map<String, String>>() {
+                });
         kvConf.putAll(result);
+        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        if (StringUtils.isBlank(jobId)) {
+            String message = String.format("sort job id is empty for groupId [%s]", groupId);
+            return ListenerResult.fail(message);
+        }
 
         FlinkInfo flinkInfo = new FlinkInfo();
-
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
-        Preconditions.checkNotEmpty(jobId, "sortJobId is empty");
         flinkInfo.setJobId(jobId);
-
         String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
-
+        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            managerFlinkTask.delete(flinkInfo);
+            flinkOperation.delete(flinkInfo);
+            log.info("job delete success for [{}]", jobId);
+            return ListenerResult.success();
         } catch (Exception e) {
-            log.error("pause exception ", e);
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            managerFlinkTask.pollFlinkStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo);
+
+            String message = String.format("delete sort failed for groupId [%s] ", groupId);
+            log.error(message, e);
+            return ListenerResult.fail(message + e.getMessage());
         }
-        return ListenerResult.success();
     }
 
     @Override
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 47c92547e..b81c72ee6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -19,17 +19,17 @@ package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.plugin.flink.Constants;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -54,68 +54,83 @@ public class RestartSortListener implements SortOperateListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        String inlongGroupId = context.getProcessForm().getInlongGroupId();
-        ObjectMapper objectMapper = new ObjectMapper();
-        UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) context.getProcessForm();
-        InlongGroupInfo inlongGroupInfo = updateGroupProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> inlongGroupExtInfos = inlongGroupInfo.getExtList();
-        log.info("inlongGroupExtInfos:{}", inlongGroupExtInfos);
-        Map<String, String> kvConf = inlongGroupExtInfos.stream()
-                .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof UpdateGroupProcessForm)) {
+            String message = String.format("process form was not UpdateGroup for groupId [%s]", groupId);
+            log.error(message);
+            return ListenerResult.fail(message);
+        }
+
+        UpdateGroupProcessForm updateGroupForm = (UpdateGroupProcessForm) processForm;
+        InlongGroupInfo inlongGroupInfo = updateGroupForm.getGroupInfo();
+        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+        log.info("inlong group ext info: {}", extList);
+
+        Map<String, String> kvConf = extList.stream().collect(
+                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("inlongGroupId:%s not add restartProcess listener,sortProperties is empty",
-                    inlongGroupId);
-            log.warn(message);
+            String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
+                    groupId);
+            log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
-                new TypeReference<Map<String, String>>(){});
+
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+                new TypeReference<Map<String, String>>() {
+                });
         kvConf.putAll(result);
+        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        if (StringUtils.isBlank(jobId)) {
+            String message = String.format("sort job id is empty for groupId [%s]", groupId);
+            return ListenerResult.fail(message);
+        }
         String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
         if (StringUtils.isEmpty(dataFlows)) {
-            String message = String.format("groupId [%s] not add restartProcess listener, "
-                    + "as the dataflows is empty", inlongGroupId);
-            log.warn(message);
+            String message = String.format("dataflow is empty for groupId [%s]", groupId);
+            log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, JsonNode> dataflowMap = objectMapper.convertValue(objectMapper.readTree(dataFlows),
-                new TypeReference<Map<String, JsonNode>>(){});
+
+        // TODO Support more than one dataflow in one sort job
+        Map<String, JsonNode> dataflowMap = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
+                });
         Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
         JsonNode dataFlow = null;
         if (dataflowOptional.isPresent()) {
             dataFlow = dataflowOptional.get();
         }
         if (Objects.isNull(dataFlow)) {
-            String message = String.format("groupId [%s] not add restartProcess listener, "
-                    + "as the dataflow is empty", inlongGroupId);
+            String message = String.format("dataflow is empty for groupId [%s]", groupId);
             log.warn(message);
             return ListenerResult.fail(message);
         }
-        String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
-        FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobName(jobName);
 
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
-        Preconditions.checkNotEmpty(jobId, "sortJobId is empty");
+        FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-
+        String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
+        flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
-        managerFlinkTask.genPath(flinkInfo,dataFlow.toString());
-
+        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            managerFlinkTask.restart(flinkInfo);
+            flinkOperation.genPath(flinkInfo, dataFlow.toString());
+            flinkOperation.restart(flinkInfo);
+            log.info("job restart success for [{}]", jobId);
+            return ListenerResult.success();
         } catch (Exception e) {
-            log.error("pause exception ", e);
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            managerFlinkTask.pollFlinkStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo);
+
+            String message = String.format("restart sort failed for groupId [%s] ", groupId);
+            log.error(message, e);
+            return ListenerResult.fail(message + e.getMessage());
         }
-        return ListenerResult.success();
     }
 
     @Override
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 919379afa..0f74b0164 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -19,16 +19,17 @@ package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.plugin.flink.Constants;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -45,6 +46,7 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 
 @Slf4j
 public class StartupSortListener implements SortOperateListener {
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -52,100 +54,98 @@ public class StartupSortListener implements SortOperateListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) context.getProcessForm();
-        InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> inlongGroupExtInfos = inlongGroupInfo.getExtList();
-        log.info("inlongGroupExtInfos:{}", inlongGroupExtInfos);
-        Map<String, String> kvConf = inlongGroupExtInfos.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName())
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            String message = String.format("process form was not GroupResource for groupId [%s]", groupId);
+            log.error(message);
+            return ListenerResult.fail(message);
+        }
+
+        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+        InlongGroupInfo inlongGroupInfo = groupResourceForm.getGroupInfo();
+        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+        log.info("inlong group ext info: {}", extList);
+
+        Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName())
                 && StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
                 InlongGroupExtInfo::getKeyName,
                 InlongGroupExtInfo::getKeyValue));
+        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+                    new TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
+        }
+
         String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
-        String inlongGroupId = context.getProcessForm().getInlongGroupId();
         if (StringUtils.isEmpty(dataFlows)) {
-            String message = String.format("groupId [%s] not add startupProcess listener, "
-                    + "as the dataflows is empty", inlongGroupId);
-            log.warn(message);
+            String message = String.format("dataflow is empty for groupId [%s]", groupId);
+            log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, JsonNode> dataflowMap = objectMapper.convertValue(objectMapper.readTree(dataFlows),
-                new TypeReference<Map<String, JsonNode>>(){});
+        Map<String, JsonNode> dataflowMap = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
+                });
         Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
         JsonNode dataFlow = null;
         if (dataflowOptional.isPresent()) {
             dataFlow = dataflowOptional.get();
         }
         if (Objects.isNull(dataFlow)) {
-            String message = String.format("groupId [%s] not add startupProcess listener, "
-                    + "as the dataflow is empty", inlongGroupId);
+            String message = String.format("dataflow is empty for groupId [%s]", groupId);
             log.warn(message);
-            ListenerResult.fail(message);
+            return ListenerResult.fail(message);
         }
 
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
-                    new TypeReference<Map<String, String>>(){});
-            kvConf.putAll(result);
-        }
         FlinkInfo flinkInfo = new FlinkInfo();
-        parseDataflow(dataFlow, flinkInfo);
-
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         flinkInfo.setJobName(jobName);
-
-        flinkInfo.setInlongStreamInfoList(groupResourceProcessForm.getInlongStreamInfoList());
+        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        flinkInfo.setEndpoint(sortUrl);
+        flinkInfo.setInlongStreamInfoList(groupResourceForm.getInlongStreamInfoList());
+        parseDataflow(dataFlow, flinkInfo);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
-        managerFlinkTask.genPath(flinkInfo, dataFlow.toString());
+        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
 
         try {
-             managerFlinkTask.start(flinkInfo);
-            log.info("the jobId {} submit success", flinkInfo.getJobId());
+            flinkOperation.genPath(flinkInfo, dataFlow.toString());
+            flinkOperation.start(flinkInfo);
+            log.info("job submit success, jobId is [{}]", flinkInfo.getJobId());
         } catch (Exception e) {
-            log.warn("startup exception: ", e);
-            managerFlinkTask.pollFlinkStatus(flinkInfo);
+            // TODO why call 4 times
+            flinkOperation.pollJobStatus(flinkInfo);
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            managerFlinkTask.pollFlinkStatus(flinkInfo);
-        }
-
-        managerFlinkTask.pollFlinkStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo);
 
-        saveInfo(context.getProcessForm().getInlongGroupId(), InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(),
-                inlongGroupExtInfos);
+            String message = String.format("startup sort failed for groupId [%s] ", groupId);
+            log.error(message, e);
+            return ListenerResult.fail(message + e.getMessage());
+        }
 
-        managerFlinkTask.pollFlinkStatus(flinkInfo);
+        saveInfo(groupId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), extList);
+        flinkOperation.pollJobStatus(flinkInfo);
         return ListenerResult.success();
     }
 
     /**
-     * save info
-     * @param inlongGroupId
-     * @param keyName
-     * @param keyValue
-     * @param inlongGroupExtInfos
+     * Save ext info into list.
      */
-    private void saveInfo(String inlongGroupId, String keyName, String keyValue,
-            List<InlongGroupExtInfo> inlongGroupExtInfos) {
-        InlongGroupExtInfo inlongGroupExtInfo = new InlongGroupExtInfo();
-        inlongGroupExtInfo.setInlongGroupId(inlongGroupId);
-        inlongGroupExtInfo.setKeyName(keyName);
-        inlongGroupExtInfo.setKeyValue(keyValue);
-        inlongGroupExtInfos.add(inlongGroupExtInfo);
+    private void saveInfo(String inlongGroupId, String keyName, String keyValue, List<InlongGroupExtInfo> extInfoList) {
+        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+        extInfo.setInlongGroupId(inlongGroupId);
+        extInfo.setKeyName(keyName);
+        extInfo.setKeyValue(keyValue);
+        extInfoList.add(extInfo);
     }
 
     /**
-     * init FlinkConf
-     * @param dataflow
-     * @param flinkInfo
+     * Init FlinkConf
      */
-    private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo)  {
+    private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo) {
         JsonNode sourceInfo = dataflow.get(Constants.SOURCE_INFO);
         String sourceType = sourceInfo.get(Constants.TYPE).asText();
         flinkInfo.setSourceType(sourceType);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index a642660e1..c822b828e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -18,16 +18,16 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -42,6 +42,7 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 
 @Slf4j
 public class SuspendSortListener implements SortOperateListener {
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -49,46 +50,59 @@ public class SuspendSortListener implements SortOperateListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        String inlongGroupId = context.getProcessForm().getInlongGroupId();
-        ObjectMapper objectMapper = new ObjectMapper();
-        UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) context.getProcessForm();
-        InlongGroupInfo inlongGroupInfo = updateGroupProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> inlongGroupExtInfos = inlongGroupInfo.getExtList();
-        log.info("inlongGroupExtInfos:{}", inlongGroupExtInfos);
-        Map<String, String> kvConf = inlongGroupExtInfos.stream().collect(Collectors.toMap(
-                InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof UpdateGroupProcessForm)) {
+            String message = String.format("process form was not UpdateGroup for groupId [%s]", groupId);
+            log.error(message);
+            return ListenerResult.fail(message);
+        }
+
+        UpdateGroupProcessForm updateGroupForm = (UpdateGroupProcessForm) processForm;
+        InlongGroupInfo inlongGroupInfo = updateGroupForm.getGroupInfo();
+        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+        log.info("inlong group ext info: {}", extList);
+
+        Map<String, String> kvConf = extList.stream().collect(
+                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("groupId [%s] not add suspendProcess listener, "
-                    + "as the sortProperties is empty", inlongGroupId);
-            log.warn(message);
+            String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
+                    groupId);
+            log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
-                new TypeReference<Map<String, String>>(){});
+
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+                new TypeReference<Map<String, String>>() {
+                });
         kvConf.putAll(result);
+        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        if (StringUtils.isBlank(jobId)) {
+            String message = String.format("sort job id is empty for groupId [%s]", groupId);
+            return ListenerResult.fail(message);
+        }
 
         FlinkInfo flinkInfo = new FlinkInfo();
-
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
-        Preconditions.checkNotEmpty(jobId, "sortJobId is empty");
         flinkInfo.setJobId(jobId);
-
         String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
-
+        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            managerFlinkTask.stop(flinkInfo);
+            flinkOperation.stop(flinkInfo);
+            log.info("job suspend success for [{}]", jobId);
+            return ListenerResult.success();
         } catch (Exception e) {
-            log.error("pause exception ", e);
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            managerFlinkTask.pollFlinkStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo);
+
+            String message = String.format("suspend sort failed for groupId [%s] ", groupId);
+            log.error(message, e);
+            return ListenerResult.fail(message + e.getMessage());
         }
-        return ListenerResult.success();
     }
 
     @Override
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index 5092739f3..db91d5271 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -17,10 +17,7 @@
 
 package org.apache.inlong.manager.plugin.util;
 
-import com.google.gson.JsonPrimitive;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
-import org.apache.inlong.manager.plugin.flink.enums.BusinessExceptionDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,66 +25,62 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.inlong.manager.plugin.flink.Constants.ADDRESS;
-import static org.apache.inlong.manager.plugin.flink.Constants.JOB_MANAGER_PORT;
-import static org.apache.inlong.manager.plugin.flink.Constants.PARALLELISM;
-import static org.apache.inlong.manager.plugin.flink.Constants.PORT;
-import static org.apache.inlong.manager.plugin.flink.Constants.SAVEPOINT_DIRECTORY;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
 
 /**
- * flink configuration. Only one instance in the process.
- * Basically it use properties file to store configurations.
+ * Configuration file for Flink, only one instance in the process.
+ * Basically it used properties file to store.
  */
-public class FlinkConfiguration  {
+public class FlinkConfiguration {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfiguration.class);
 
     private static final String DEFAULT_CONFIG_FILE = "flink-sort-plugin.properties";
+    private static final String INLONG_MANAGER = "inlong-manager";
 
-    private final Map<String, JsonPrimitive> configStorage = new HashMap<>();
+    private final FlinkConfig flinkConfig;
 
-    private  FlinkConfig flinkConfig;
+    /**
+     * load config from flink file.
+     */
+    public FlinkConfiguration() throws Exception {
+        String path = formatPath();
+        flinkConfig = getFlinkConfigFromFile(path);
+    }
 
     /**
      * fetch DEFAULT_CONFIG_FILE full path
-     * @return
      */
-    private String formatPath() {
+    private String formatPath() throws Exception {
         String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-        LOGGER.info("format first path {}",path);
-        String inlongManager = "inlong-manager";
-        if (path.contains(inlongManager)) {
-            path = path.substring(0, path.indexOf(inlongManager));
-            String confPath = path + inlongManager + File.separator + "plugins" + File.separator + DEFAULT_CONFIG_FILE;
-            LOGGER.info("flink-sort-plugin.properties path : {}",confPath);
-            File file = new File(confPath);
-            if (!file.exists()) {
-                LOGGER.warn("plugins config file path:[{}] not found flink-sort-plugin.properties", confPath);
-                throw new BusinessException(BusinessExceptionDesc.InternalError
-                        + " not found flink-sort-plugin.properties");
-            }
-            return confPath;
-        } else {
-            throw new BusinessException(BusinessExceptionDesc.InternalError + "plugins dictionary not found ");
+        LOGGER.info("format first path {}", path);
+
+        int index = path.indexOf(INLONG_MANAGER);
+        if (index == -1) {
+            throw new Exception(INLONG_MANAGER + " path not found in " + path);
         }
-    }
 
-    /**
-     * load config from flink file.
-     */
-    public FlinkConfiguration() throws IOException {
-        String path = formatPath();
-        flinkConfig = getFlinkConfigFromFile(path);
+        path = path.substring(0, index);
+        String confPath = path + INLONG_MANAGER + File.separator + "plugins" + File.separator + DEFAULT_CONFIG_FILE;
+        File file = new File(confPath);
+        if (!file.exists()) {
+            String message = String.format("not found %s in path %s", DEFAULT_CONFIG_FILE, confPath);
+            LOGGER.error(message);
+            throw new Exception(message);
+        }
 
+        LOGGER.info("after format, {} located in {}", DEFAULT_CONFIG_FILE, confPath);
+        return confPath;
     }
 
     /**
-     * get flinkcongfig
-     * @return
+     * get flink config
      */
     public FlinkConfig getFlinkConfig() {
         return flinkConfig;
@@ -95,16 +88,11 @@ public class FlinkConfiguration  {
 
     /**
      * parse properties
-     * @param fileName
-     * @return
-     * @throws IOException
      */
-    private  FlinkConfig getFlinkConfigFromFile(String fileName) throws IOException {
+    private FlinkConfig getFlinkConfigFromFile(String fileName) throws IOException {
         Properties properties = new Properties();
         BufferedReader bufferedReader = new BufferedReader(new FileReader(fileName));
         properties.load(bufferedReader);
-        properties.forEach((key, value) -> configStorage.put((String) key,
-                new JsonPrimitive((String) value)));
         FlinkConfig flinkConfig = new FlinkConfig();
         flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT)));
         flinkConfig.setAddress(properties.getProperty(ADDRESS));
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 7e3654fd8..f610c652f 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -36,25 +36,23 @@ import java.util.regex.Pattern;
 
 @Slf4j
 public class FlinkUtils {
+
     public static final String BASE_DIRECTORY = "config";
 
-    public static final List<String> FLINK_VERSION_COLLECTION = Arrays.asList("Flink-1.13");
+    public static final List<String> FLINK_VERSION_COLLECTION = Collections.singletonList("Flink-1.13");
 
     /**
      * getLatestFlinkVersion
-     * @param supportedFlink
-     * @return
      */
-    public static String getLatestFlinkVersion(String [] supportedFlink) {
+    public static String getLatestFlinkVersion(String[] supportedFlink) {
         if (Objects.isNull(supportedFlink)) {
             return null;
         }
         Arrays.sort(supportedFlink, Collections.reverseOrder());
         String latestFinkVersion = null;
-        for (int i = 0; i < supportedFlink.length; i++) {
-            String flinkVersion = supportedFlink[i];
+        for (String flinkVersion : supportedFlink) {
             latestFinkVersion = FLINK_VERSION_COLLECTION.stream()
-                            .filter(v -> v.equals(flinkVersion)).findFirst().orElse(null);
+                    .filter(v -> v.equals(flinkVersion)).findFirst().orElse(null);
             if (Objects.nonNull(latestFinkVersion)) {
                 return latestFinkVersion;
             }
@@ -64,8 +62,6 @@ public class FlinkUtils {
 
     /**
      * print exception
-     * @param throwable
-     * @return
      */
     public static String getExceptionStackMsg(Throwable throwable) {
         StringWriter stringWriter = new StringWriter();
@@ -75,24 +71,22 @@ public class FlinkUtils {
 
     /**
      * fetch sort-single-tenant jar path
-     * @param baseDirName
-     * @return
      */
-    public static String findFiles(String baseDirName,String pattern) {
+    public static String findFiles(String baseDirName, String pattern) {
         File baseDir = new File(baseDirName);
         if (!baseDir.exists() || !baseDir.isDirectory()) {
             log.error("baseDirName find fail :{}", baseDirName);
             return null;
         }
-        String tempName = null;
+        String tempName;
         File tempFile;
         File[] files = baseDir.listFiles();
-        if (files.length == 0) {
+        if (files == null || files.length == 0) {
             log.info("baseDirName is empty");
             return null;
         }
-        for (int i = 0; i < files.length; i++) {
-            tempFile = files[i];
+        for (File file : files) {
+            tempFile = file;
             tempName = tempFile.getName();
             Pattern jarPathPattern = Pattern.compile(pattern);
             Matcher matcher = jarPathPattern.matcher(tempName);
@@ -106,9 +100,6 @@ public class FlinkUtils {
 
     /**
      * get value
-     * @param key
-     * @param defaultValue
-     * @return
      */
     public static String getValue(String key, String defaultValue) {
         return StringUtils.isNotEmpty(key) ? key : defaultValue;
@@ -116,8 +107,6 @@ public class FlinkUtils {
 
     /**
      * getConfigDirectory
-     * @param name
-     * @return
      */
     public static String getConfigDirectory(String name) {
         return BASE_DIRECTORY + File.separator + name;
@@ -125,10 +114,6 @@ public class FlinkUtils {
 
     /**
      * writeConfigToFile
-     * @param configJobDirectory
-     * @param configFileName
-     * @param content
-     * @return
      */
     public static boolean writeConfigToFile(String configJobDirectory, String configFileName, String content) {
         File file = new File(configJobDirectory);
@@ -151,8 +136,6 @@ public class FlinkUtils {
 
     /**
      * delete configuration file
-     * @param name
-     * @return
      */
     public static boolean deleteConfigFile(String name) {
         String configDirectory = getConfigDirectory(name);
@@ -161,7 +144,7 @@ public class FlinkUtils {
             try {
                 FileUtils.deleteDirectory(file);
             } catch (IOException e) {
-                log.error("delete %s failed", configDirectory);
+                log.error("delete {} failed", configDirectory, e);
                 return false;
             }
         }
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
index 94008dcba..fa150125c 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
@@ -22,7 +22,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.plugin.flink.Constants;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.junit.Test;
 
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
index 7bdc16dd1..49d6b13a4 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
@@ -54,7 +54,7 @@ public class WorkflowFormParserUtils {
 
         UserTask userTask = (UserTask) task;
         try {
-            JavaType javaType = JsonUtils.MAPPER.constructType(userTask.getFormClass());
+            JavaType javaType = JsonUtils.OBJECT_MAPPER.constructType(userTask.getFormClass());
             return JsonUtils.parse(workflowTaskEntity.getFormData(), javaType);
         } catch (Exception e) {
             log.error("task form parse failed, form is: {}", workflowTaskEntity.getFormData(), e);
@@ -74,7 +74,7 @@ public class WorkflowFormParserUtils {
         }
 
         try {
-            JavaType javaType = JsonUtils.MAPPER.constructType(process.getFormClass());
+            JavaType javaType = JsonUtils.OBJECT_MAPPER.constructType(process.getFormClass());
             return JsonUtils.parse(form, javaType);
         } catch (Exception e) {
             log.error("process form parse failed, form is: {}", form, e);