You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/29 15:37:06 UTC

[incubator-streampark] branch dev updated: [Bug] flink cluster management bug fixed (#2100)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6d6644392 [Bug] flink cluster management bug fixed (#2100)
6d6644392 is described below

commit 6d664439265535980789808802a9e06ec817553d
Author: benjobs <be...@apache.org>
AuthorDate: Tue Nov 29 23:36:59 2022 +0800

    [Bug] flink cluster management bug fixed (#2100)
---
 .../streampark/common/enums/ClusterState.java      |   7 +-
 .../streampark/common/util/HadoopConfigUtils.scala |   6 +-
 .../src/assembly/script/schema/mysql-schema.sql    |   1 -
 .../src/assembly/script/schema/pgsql-schema.sql    |   1 -
 .../src/assembly/script/upgrade/mysql/2.0.0.sql    |   4 +-
 .../streampark/console/base/util/CommonUtils.java  |  11 +-
 .../core/controller/FlinkClusterController.java    |  48 ++--
 .../console/core/entity/AppBuildPipeline.java      |  10 +-
 .../console/core/entity/Application.java           |  11 +-
 .../console/core/entity/FlinkCluster.java          |  61 +++--
 .../console/core/mapper/ApplicationMapper.java     |   4 +
 .../console/core/mapper/FlinkClusterMapper.java    |   2 +-
 .../console/core/service/ApplicationService.java   |   3 +
 .../console/core/service/FlinkClusterService.java  |  14 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |   6 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  84 ++++---
 .../core/service/impl/FlinkClusterServiceImpl.java | 245 +++++++++++----------
 .../src/main/resources/db/schema-h2.sql            |   1 -
 .../resources/mapper/core/ApplicationMapper.xml    |  16 ++
 .../resources/mapper/core/FlinkClusterMapper.xml   |   8 +-
 .../src/api/flink/app/app.type.ts                  |   2 -
 .../src/enums/flinkEnum.ts                         |   6 +-
 .../src/locales/lang/en/flink/app.ts               |   2 -
 .../src/locales/lang/en/flink/setting.ts           |   2 +-
 .../src/locales/lang/zh-CN/flink/app.ts            |   2 -
 .../src/locales/lang/zh-CN/flink/setting.ts        |   2 +-
 .../src/views/flink/app/Add.vue                    |  11 +-
 .../src/views/flink/app/EditFlink.vue              |   1 -
 .../src/views/flink/app/EditStreamPark.vue         |  27 +--
 .../flink/app/hooks/useCreateAndEditSchema.ts      |  12 +-
 .../src/views/flink/app/hooks/useFlinkSchema.ts    |  20 +-
 .../src/views/flink/app/utils/index.ts             |   5 -
 .../src/views/flink/setting/AddCluster.vue         |  19 +-
 .../src/views/flink/setting/EditCluster.vue        |  36 +--
 .../setting/components/FlinkClusterSetting.vue     | 184 ++++------------
 .../views/flink/setting/hooks/useClusterSetting.ts |  94 +++++---
 .../flink/submit/bean/DeployRequest.scala          |   1 -
 .../flink/submit/bean/DeployResponse.scala         |   4 +-
 .../impl/KubernetesNativeApplicationSubmit.scala   |   1 -
 .../impl/KubernetesNativeSessionSubmit.scala       |   3 +-
 .../streampark/flink/submit/impl/LocalSubmit.scala |   3 +-
 .../flink/submit/impl/RemoteSubmit.scala           |   3 +-
 .../flink/submit/impl/YarnSessionSubmit.scala      |  13 +-
 .../submit/trait/KubernetesNativeSubmitTrait.scala |   3 +-
 .../flink/submit/trait/YarnSubmitTrait.scala       |   4 +-
 45 files changed, 457 insertions(+), 546 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
index cc2f70138..74d12f051 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
@@ -34,7 +34,12 @@ public enum ClusterState implements Serializable {
     /**
      * cluster stopped
      */
-    STOPED(2);
+    STOPED(2),
+
+    /**
+     * cluster lost
+     */
+    LOST(3);
 
     private final Integer value;
 
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
index 3cfadbb68..8c67ea77e 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.fs.LfsOperator
 import org.apache.commons.io.{FileUtils => ApacheFileUtils}
 
 import java.io.File
-import java.util.{Map => JavaMap, Optional => JOption}
+import java.util.{Collections, Map => JavaMap, Optional => JOption}
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.immutable.ListMap
@@ -117,7 +117,7 @@ object HadoopConfigUtils {
           .filter(f => HADOOP_CLIENT_CONF_FILES.contains(f.getName))
           .map(f => f.getName -> ApacheFileUtils.readFileToString(f, "UTF-8"))
           .toMap.asJava)
-      .getOrElse(Maps.newHashMap[String, String])
+      .getOrElse(Collections.emptyMap[String, String]())
 
   /**
    * Read system hive config to Map
@@ -129,7 +129,7 @@ object HadoopConfigUtils {
           .filter(f => HIVE_CLIENT_CONF_FILES.contains(f.getName))
           .map(f => f.getName -> ApacheFileUtils.readFileToString(f, "UTF-8"))
           .toMap.asJava)
-      .getOrElse(Maps.newHashMap[String, String])
+      .getOrElse(Collections.emptyMap[String, String]())
 
   }
 
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
index dbc244896..dda27ac56 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
@@ -461,7 +461,6 @@ create table `t_flink_cluster` (
   `dynamic_properties` text comment 'allows specifying multiple generic configuration options',
   `k8s_rest_exposed_type` tinyint default 2 comment 'k8s export(0:loadbalancer,1:clusterip,2:nodeport)',
   `k8s_hadoop_integration` tinyint default 0,
-  `flame_graph` tinyint default 0 comment 'flameGraph enable,default disable',
   `k8s_conf` varchar(255) default null comment 'the path where the k8s configuration file is located',
   `resolve_order` int default null,
   `exception` text comment 'exception information',
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
index 38ec0c479..ecefb7de6 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
@@ -311,7 +311,6 @@ create table "public"."t_flink_cluster" (
   "dynamic_properties" text collate "pg_catalog"."default",
   "k8s_rest_exposed_type" int2 default 2,
   "k8s_hadoop_integration" boolean default false,
-  "flame_graph" boolean default false,
   "k8s_conf" varchar(255) collate "pg_catalog"."default",
   "resolve_order" int4,
   "exception" text collate "pg_catalog"."default",
diff --git a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
index 888325aee..872dee223 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
@@ -79,7 +79,9 @@ alter table `t_flink_project`
     add column `modify_time` datetime not null default current_timestamp on update current_timestamp after `create_time`,
     add index `inx_team` (`team_id`) using btree;
 
-alter table `t_flink_cluster` add column `dynamic_properties` text comment 'allows specifying multiple generic configuration options' after `flink_image`;
+alter table `t_flink_cluster`
+    drop column `flame_graph`,
+    add column `dynamic_properties` text comment 'allows specifying multiple generic configuration options' after `flink_image`;
 
 -- change `update_time` to `modify_time`
 alter table `t_app_build_pipe` change column `update_time` `modify_time` datetime not null default current_timestamp on update current_timestamp;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
index 593fc9415..996a9251f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.base.util;
 
-import org.apache.streampark.common.util.AssertUtils;
+import  org.apache.streampark.common.util.AssertUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.cglib.beans.BeanMap;
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -637,4 +638,12 @@ public final class CommonUtils implements Serializable {
         }
     }
 
+    public static boolean isLegalUrl(String url) {
+        try {
+            new URI(url);
+            return true;
+        } catch (Exception ignored) {
+            return false;
+        }
+    }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 5927012e5..942b408b8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -57,39 +57,22 @@ public class FlinkClusterController {
 
     @PostMapping("check")
     public RestResponse check(FlinkCluster cluster) {
-        String checkResult = flinkClusterService.check(cluster);
+        ResponseResult checkResult = flinkClusterService.check(cluster);
         return RestResponse.success(checkResult);
     }
 
     @PostMapping("create")
     @RequiresPermissions("cluster:create")
     public RestResponse create(FlinkCluster cluster) {
-        ResponseResult result = flinkClusterService.create(cluster);
-        return RestResponse.success(result);
+        Boolean success = flinkClusterService.create(cluster);
+        return RestResponse.success(success);
     }
 
     @PostMapping("update")
     @RequiresPermissions("cluster:update")
     public RestResponse update(FlinkCluster cluster) {
-        FlinkCluster flinkCluster = flinkClusterService.getById(cluster.getId());
-        flinkCluster.setClusterId(cluster.getClusterId());
-        flinkCluster.setVersionId(cluster.getVersionId());
-        flinkCluster.setClusterName(cluster.getClusterName());
-        flinkCluster.setAddress(cluster.getAddress());
-        flinkCluster.setExecutionMode(cluster.getExecutionMode());
-        flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
-        flinkCluster.setFlameGraph(cluster.getFlameGraph());
-        flinkCluster.setFlinkImage(cluster.getFlinkImage());
-        flinkCluster.setOptions(cluster.getOptions());
-        flinkCluster.setYarnQueue(cluster.getYarnQueue());
-        flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
-        flinkCluster.setK8sConf(cluster.getK8sConf());
-        flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
-        flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
-        flinkCluster.setResolveOrder(cluster.getResolveOrder());
-        flinkCluster.setServiceAccount(cluster.getServiceAccount());
-        flinkCluster.setDescription(cluster.getDescription());
-        return RestResponse.success(flinkClusterService.update(flinkCluster));
+        flinkClusterService.update(cluster);
+        return RestResponse.success();
     }
 
     @PostMapping("get")
@@ -99,23 +82,20 @@ public class FlinkClusterController {
     }
 
     @PostMapping("start")
-    public RestResponse start(FlinkCluster flinkCluster) {
-        FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
-        ResponseResult start = flinkClusterService.start(cluster);
-        return RestResponse.success(start);
+    public RestResponse start(FlinkCluster cluster) {
+        flinkClusterService.start(cluster);
+        return RestResponse.success();
     }
 
     @PostMapping("shutdown")
-    public RestResponse shutdown(FlinkCluster flinkCluster) {
-        FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
-        ResponseResult shutdown = flinkClusterService.shutdown(cluster);
-        return RestResponse.success(shutdown);
+    public RestResponse shutdown(FlinkCluster cluster) {
+        flinkClusterService.shutdown(cluster);
+        return RestResponse.success();
     }
 
     @PostMapping("delete")
-    public RestResponse delete(FlinkCluster flinkCluster) {
-        FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
-        ResponseResult delete = flinkClusterService.delete(cluster);
-        return RestResponse.success(delete);
+    public RestResponse delete(FlinkCluster cluster) {
+        flinkClusterService.delete(cluster);
+        return RestResponse.success();
     }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
index ee1a9b38a..c7a3046e7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
@@ -34,7 +34,6 @@ import com.baomidou.mybatisplus.annotation.TableName;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Maps;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -46,6 +45,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -122,14 +122,14 @@ public class AppBuildPipeline {
     @JsonIgnore
     public Map<Integer, PipelineStepStatus> getStepStatus() {
         if (StringUtils.isBlank(stepStatusJson)) {
-            return Maps.newHashMap();
+            return Collections.emptyMap();
         }
         try {
             return JacksonUtils.read(stepStatusJson, new TypeReference<HashMap<Integer, PipelineStepStatus>>() {
             });
         } catch (JsonProcessingException e) {
             log.error("json parse error on ApplicationBuildPipeline, stepStatusJson={}", stepStatusJson, e);
-            return Maps.newHashMap();
+            return Collections.emptyMap();
         }
     }
 
@@ -148,14 +148,14 @@ public class AppBuildPipeline {
     @JsonIgnore
     public Map<Integer, Long> getStepStatusTimestamp() {
         if (StringUtils.isBlank(stepStatusTimestampJson)) {
-            return Maps.newHashMap();
+            return Collections.emptyMap();
         }
         try {
             return JacksonUtils.read(stepStatusTimestampJson, new TypeReference<HashMap<Integer, Long>>() {
             });
         } catch (JsonProcessingException e) {
             log.error("json parse error on ApplicationBuildPipeline, stepStatusJson={}", stepStatusTimestampJson, e);
-            return Maps.newHashMap();
+            return Collections.emptyMap();
         }
     }
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 8be656d51..3c2650350 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -226,6 +226,7 @@ public class Application implements Serializable {
     /**
      * the cluster id bound to the task in remote mode
      */
+    @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Long flinkClusterId;
 
     private String description;
@@ -296,7 +297,6 @@ public class Application implements Serializable {
     private transient String createTimeTo;
     private transient String backUpDescription;
     private transient String yarnQueue;
-    private transient String yarnSessionClusterId;
 
     /**
      * Flink Web UI Url
@@ -482,7 +482,10 @@ public class Application implements Serializable {
     @SneakyThrows
     @SuppressWarnings("unchecked")
     public Map<String, Object> getOptionMap() {
-        Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
+        if (StringUtils.isBlank(this.options)) {
+            return Collections.emptyMap();
+        }
+        Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
         map.entrySet().removeIf(entry -> entry.getValue() == null);
         return map;
     }
@@ -665,10 +668,6 @@ public class Application implements Serializable {
             if (StringUtils.isNotEmpty(appParam.getYarnQueue())) {
                 hotParams.put(ConfigConst.KEY_YARN_APP_QUEUE(), appParam.getYarnQueue());
             }
-        } else if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
-            if (StringUtils.isNotEmpty(appParam.getYarnSessionClusterId())) {
-                hotParams.put(ConfigConst.KEY_YARN_APP_ID(), appParam.getYarnSessionClusterId());
-            }
         }
         if (!hotParams.isEmpty()) {
             this.setHotParams(JacksonUtils.write(hotParams));
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 9ee264226..e9c55b2ec 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -23,7 +23,9 @@ import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.flink.submit.FlinkSubmitter;
 
 import com.baomidou.mybatisplus.annotation.IdType;
@@ -35,12 +37,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.Data;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.http.client.config.RequestConfig;
 
 import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -86,8 +90,6 @@ public class FlinkCluster implements Serializable {
 
     private Integer k8sRestExposedType;
 
-    private Boolean flameGraph;
-
     private String k8sConf;
 
     private Integer resolveOrder;
@@ -115,10 +117,15 @@ public class FlinkCluster implements Serializable {
     @JsonIgnore
     @SneakyThrows
     public Map<String, Object> getOptionMap() {
-        Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
+        if (StringUtils.isBlank(this.options)) {
+            return Collections.emptyMap();
+        }
+        Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
         if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
             map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
-            map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+            if (StringUtils.isNotEmpty(this.yarnQueue)) {
+                map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+            }
         }
         map.entrySet().removeIf(entry -> entry.getValue() == null);
         return map;
@@ -138,23 +145,38 @@ public class FlinkCluster implements Serializable {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+
+            //1) check url is Legal
+            for (String url: array) {
+                if (!CommonUtils.isLegalUrl(url)) {
+                    return false;
+                }
             }
+
+            // 2) check connection
+            for (String url : array) {
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }
+            }
+            return false;
         }
         return false;
     }
@@ -164,6 +186,9 @@ public class FlinkCluster implements Serializable {
         URI activeAddress = this.getActiveAddress();
         String restUrl = activeAddress.toURL() + "/jobmanager/config";
         String json = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+        if (StringUtils.isEmpty(json)) {
+            return Collections.emptyMap();
+        }
         List<Map<String, String>> confList = JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>() {
         });
         Map<String, String> config = new HashMap<>(0);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 88472e956..e8ed4bed0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -57,4 +57,8 @@ public interface ApplicationMapper extends BaseMapper<Application> {
     Boolean existsByJobName(@Param("jobName") String jobName);
 
     List<Application> getByProjectId(@Param("projectId") Long id);
+
+    boolean existsRunningJobByClusterId(@Param("clusterId")Long clusterId);
+
+    boolean existsJobByClusterId(@Param("clusterId")Long clusterId);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
index 428aa8e99..d9e9b6558 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
@@ -24,7 +24,7 @@ import org.apache.ibatis.annotations.Param;
 
 public interface FlinkClusterMapper extends BaseMapper<FlinkCluster> {
 
-    Boolean existsByClusterId(@Param("clusterId") String clusterId);
+    Boolean existsByClusterId(@Param("clusterId") String clusterId, @Param("id") Long id);
 
     Boolean existsByClusterName(@Param("clusterName") String clusterName, @Param("id") Long id);
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index c3dfc493a..274ab0d13 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -98,4 +98,7 @@ public interface ApplicationService extends IService<Application> {
 
     void forcedStop(Application app);
 
+    boolean existsRunningJobByClusterId(Long clusterId);
+
+    boolean existsJobByClusterId(Long id);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index a2c525441..ecf775994 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -24,19 +24,19 @@ import com.baomidou.mybatisplus.extension.service.IService;
 
 public interface FlinkClusterService extends IService<FlinkCluster> {
 
-    String check(FlinkCluster flinkCluster);
+    ResponseResult check(FlinkCluster flinkCluster);
 
-    ResponseResult create(FlinkCluster flinkCluster);
+    Boolean create(FlinkCluster flinkCluster);
 
-    ResponseResult delete(FlinkCluster flinkCluster);
+    void delete(FlinkCluster flinkCluster);
 
-    ResponseResult update(FlinkCluster flinkCluster);
+    void update(FlinkCluster flinkCluster);
 
-    ResponseResult start(FlinkCluster flinkCluster);
+    void start(FlinkCluster flinkCluster);
 
-    ResponseResult shutdown(FlinkCluster flinkCluster);
+    void shutdown(FlinkCluster flinkCluster);
 
-    Boolean existsByClusterId(String clusterId);
+    Boolean existsByClusterId(String clusterId, Long id);
 
     Boolean existsByClusterName(String clusterName, Long id);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index e19e3c371..7daf79764 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -73,7 +73,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -84,6 +83,7 @@ import org.springframework.transaction.annotation.Transactional;
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -442,14 +442,14 @@ public class AppBuildPipeServiceImpl
     @Override
     public Map<Long, PipelineStatus> listPipelineStatus(List<Long> appIds) {
         if (CollectionUtils.isEmpty(appIds)) {
-            return Maps.newHashMap();
+            return Collections.emptyMap();
         }
         LambdaQueryWrapper<AppBuildPipeline> queryWrapper = new LambdaQueryWrapper<AppBuildPipeline>()
             .in(AppBuildPipeline::getAppId, appIds);
 
         List<AppBuildPipeline> appBuildPipelines = baseMapper.selectList(queryWrapper);
         if (CollectionUtils.isEmpty(appBuildPipelines)) {
-            return Maps.newHashMap();
+            return Collections.emptyMap();
         }
         return appBuildPipelines.stream().collect(Collectors.toMap(e -> e.getAppId(), e -> e.getPipelineStatus()));
     }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index b253dccdb..42ff934c4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
@@ -436,10 +437,19 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             }
             envInitializer.checkFlinkEnv(application.getStorageType(), flinkEnv);
             envInitializer.storageInitialize(application.getStorageType());
+
+            if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
+                || ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+                FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId());
+                boolean conned = flinkCluster.verifyClusterConnection();
+                if (!conned) {
+                    throw new ApiAlertException("the target cluster is unavailable, please check!");
+                }
+            }
             return true;
         } catch (Exception e) {
             log.error(ExceptionUtils.stringifyException(e));
-            throw new ApplicationException(e);
+            throw new ApiDetailException(e);
         }
     }
 
@@ -519,6 +529,26 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         return baseMapper.existsByTeamId(teamId);
     }
 
+    @Override
+    public boolean existsRunningJobByClusterId(Long clusterId) {
+        boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+        if (exists) {
+            return true;
+        }
+        for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
+            if (clusterId.equals(application.getFlinkClusterId()) &&
+                FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean existsJobByClusterId(Long clusterId) {
+        return baseMapper.existsJobByClusterId(clusterId);
+    }
+
     @Override
     public String getYarnName(Application appParam) {
         String[] args = new String[2];
@@ -711,9 +741,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
     public boolean update(Application appParam) {
         try {
             Application application = getById(appParam.getId());
-
             application.setLaunch(LaunchState.NEED_LAUNCH.get());
-
             if (application.isUploadJob()) {
                 if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
                     application.setBuild(true);
@@ -776,9 +804,14 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             application.setCpFailureAction(appParam.getCpFailureAction());
             application.setCpFailureRateInterval(appParam.getCpFailureRateInterval());
             application.setCpMaxFailureInterval(appParam.getCpMaxFailureInterval());
-            application.setFlinkClusterId(appParam.getFlinkClusterId());
             application.setTags(appParam.getTags());
 
+            if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum()) ||
+                ExecutionMode.YARN_PER_JOB.equals(application.getExecutionModeEnum()) ||
+                ExecutionMode.KUBERNETES_NATIVE_APPLICATION.equals(application.getExecutionModeEnum())) {
+                application.setFlinkClusterId(null);
+            }
+
             // Flink Sql job...
             if (application.isFlinkSqlJob()) {
                 updateFlinkSqlJob(application, appParam);
@@ -942,7 +975,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         if (startFuture == null && cancelFuture == null) {
             this.updateToStopped(app);
         }
-
     }
 
     @Override
@@ -998,10 +1030,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
                 if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
                     application.setYarnQueue(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE()).toString());
                 }
-            } else if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-                if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_ID())) {
-                    application.setYarnSessionClusterId(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID()).toString());
-                }
             }
         }
         return application;
@@ -1052,6 +1080,11 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         application.setOptionTime(new Date());
         this.baseMapper.updateById(application);
 
+        Long userId = commonService.getUserId();
+        if (!application.getUserId().equals(userId)) {
+            FlinkTrackingTask.addCanceledApp(application.getId(), userId);
+        }
+
         FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
 
         // infer savepoint
@@ -1063,6 +1096,21 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             }
         }
 
+        String clusterId = null;
+        if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
+            clusterId = application.getClusterId();
+        } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+            if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+                FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
+                AssertUtils.state(cluster != null,
+                    String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+                        "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
+                clusterId = cluster.getClusterId();
+            } else {
+                clusterId = application.getAppId();
+            }
+        }
+
         Map<String, Object> properties = new HashMap<>();
 
         if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
@@ -1074,26 +1122,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             URI activeAddress = cluster.getActiveAddress();
             properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
             properties.put(RestOptions.PORT.key(), activeAddress.getPort());
-        } else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
-            if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-                FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
-                AssertUtils.state(cluster != null,
-                    String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
-                        "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
-                properties.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
-            }
         }
 
-        Long userId = commonService.getUserId();
-        if (!application.getUserId().equals(userId)) {
-            FlinkTrackingTask.addCanceledApp(application.getId(), userId);
-        }
-        String clusterId = null;
-        if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
-            clusterId = application.getClusterId();
-        } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-            clusterId = application.getAppId();
-        }
         CancelRequest cancelRequest = new CancelRequest(
             flinkEnv.getFlinkVersion(),
             ExecutionMode.of(application.getExecutionMode()),
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index da14fa90a..27a69da6d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -17,19 +17,19 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.mapper.FlinkClusterMapper;
+import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
-import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.task.FlinkTrackingTask;
 import org.apache.streampark.flink.submit.FlinkSubmitter;
 import org.apache.streampark.flink.submit.bean.DeployRequest;
@@ -49,10 +49,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.io.Serializable;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -80,66 +77,76 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     @Autowired
     private CommonService commonService;
 
+
     @Autowired
-    private SettingService settingService;
+    private ApplicationService applicationService;
 
     @Override
-    public String check(FlinkCluster cluster) {
-        if (null == cluster.getClusterName() || null == cluster.getExecutionMode()) {
-            return "error";
-        }
-        //1) Check if name is duplicate, if it already exists
+    public ResponseResult check(FlinkCluster cluster) {
+        ResponseResult result = new ResponseResult();
+        result.setStatus(0);
+
+        //1) Check name is already exists
         Boolean existsByClusterName = this.existsByClusterName(cluster.getClusterName(), cluster.getId());
         if (existsByClusterName) {
-            return "exists";
+            result.setMsg("clusterName is already exists,please check!");
+            result.setStatus(1);
+            return result;
         }
 
-        if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
-            //2) Check if the connection can be made to
-            return cluster.verifyConnection() ? "success" : "fail";
+        //2) Check target-cluster is already exists
+        String clusterId = cluster.getClusterId();
+        if (StringUtils.isNotEmpty(clusterId)) {
+            Boolean existsByClusterId = this.existsByClusterId(clusterId, cluster.getId());
+            if (existsByClusterId) {
+                result.setMsg("the clusterId " + clusterId + " is already exists,please check!");
+                result.setStatus(2);
+                return result;
+            }
         }
-        return "success";
-    }
 
-    @Override
-    public ResponseResult create(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
-        if (StringUtils.isBlank(flinkCluster.getClusterName())) {
-            result.setMsg("clusterName can't empty!");
-            result.setStatus(0);
-            return result;
-        }
-        String clusterId = flinkCluster.getClusterId();
-        if (StringUtils.isNoneBlank(clusterId)) {
-            Boolean existsByClusterId = this.existsByClusterId(clusterId);
-            if (existsByClusterId) {
-                result.setMsg("the clusterId" + clusterId + "is already exists,please check!");
-                result.setStatus(0);
+        // 3) Check connection
+        if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
+            if (!cluster.verifyClusterConnection()) {
+                result.setMsg("the remote cluster connection failed, please check!");
+                result.setStatus(3);
                 return result;
             }
+        } else if (ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())) {
+            if (cluster.getId() == null && !StringUtils.isAllBlank(cluster.getAddress(), cluster.getClusterId())) {
+                if (!cluster.verifyClusterConnection()) {
+                    result.setMsg("the flink cluster connection failed, please check!");
+                    result.setStatus(4);
+                    return result;
+                }
+            }
         }
+
+        return result;
+    }
+
+    @Override
+    public Boolean create(FlinkCluster flinkCluster) {
         flinkCluster.setUserId(commonService.getUserId());
         flinkCluster.setCreateTime(new Date());
-        // remote mode directly set STARTED
-        if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+        if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
+            if (StringUtils.isAllBlank(flinkCluster.getAddress(), flinkCluster.getClusterId())) {
+                flinkCluster.setClusterState(ClusterState.CREATED.getValue());
+            } else {
+                flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+            }
+        } else if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
             flinkCluster.setClusterState(ClusterState.STARTED.getValue());
         } else {
             flinkCluster.setClusterState(ClusterState.CREATED.getValue());
         }
-        try {
-            save(flinkCluster);
-            result.setStatus(1);
-        } catch (Exception e) {
-            result.setStatus(0);
-            result.setMsg("create cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-        }
-        return result;
+        return save(flinkCluster);
     }
 
     @Override
     @Transactional(rollbackFor = {Exception.class})
-    public ResponseResult start(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public void start(FlinkCluster cluster) {
+        FlinkCluster flinkCluster = getById(cluster.getId());
         LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
         updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
         try {
@@ -158,67 +165,68 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
                         flinkCluster.getK8sRestExposedTypeEnum());
                     break;
                 default:
-                    result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
-                    result.setStatus(0);
-                    return result;
+                    throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
             }
             FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
             DeployRequest deployRequest = new DeployRequest(
                 flinkEnv.getFlinkVersion(),
                 flinkCluster.getClusterId(),
                 executionModeEnum,
-                flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) : null,
                 flinkCluster.getProperties(),
                 kubernetesDeployParam
             );
             log.info("deploy cluster request " + deployRequest);
             Future<DeployResponse> future = executorService.submit(() -> FlinkSubmitter.deploy(deployRequest));
             DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
-            if (null != deployResponse) {
-                if (deployResponse.message() == null) {
-                    updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
-                    updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
-                    updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
-                    updateWrapper.set(FlinkCluster::getException, null);
-                    update(updateWrapper);
-                    result.setStatus(1);
-                    FlinkTrackingTask.removeFlinkCluster(flinkCluster);
-                } else {
-                    result.setStatus(0);
-                    result.setMsg("deploy cluster failed," + deployResponse.message());
-                }
+            if (deployResponse != null) {
+                updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
+                updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
+                updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
+                updateWrapper.set(FlinkCluster::getException, null);
+                update(updateWrapper);
+                FlinkTrackingTask.removeFlinkCluster(flinkCluster);
             } else {
-                result.setStatus(0);
-                result.setMsg("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
+                throw new ApiAlertException("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
             }
-            return result;
         } catch (Exception e) {
             log.error(e.getMessage(), e);
             updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
             updateWrapper.set(FlinkCluster::getException, e.toString());
             update(updateWrapper);
-            result.setStatus(0);
-            result.setMsg("deploy cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-            return result;
+            throw new ApiDetailException(e);
         }
     }
 
     @Override
-    public ResponseResult update(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public void update(FlinkCluster cluster) {
+        FlinkCluster flinkCluster = getById(cluster.getId());
+        flinkCluster.setClusterId(cluster.getClusterId());
+        flinkCluster.setVersionId(cluster.getVersionId());
+        flinkCluster.setClusterName(cluster.getClusterName());
+        flinkCluster.setAddress(cluster.getAddress());
+        flinkCluster.setExecutionMode(cluster.getExecutionMode());
+        flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
+        flinkCluster.setFlinkImage(cluster.getFlinkImage());
+        flinkCluster.setOptions(cluster.getOptions());
+        flinkCluster.setYarnQueue(cluster.getYarnQueue());
+        flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
+        flinkCluster.setK8sConf(cluster.getK8sConf());
+        flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
+        flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
+        flinkCluster.setResolveOrder(cluster.getResolveOrder());
+        flinkCluster.setServiceAccount(cluster.getServiceAccount());
+        flinkCluster.setDescription(cluster.getDescription());
         try {
             updateById(flinkCluster);
-            result.setStatus(1);
         } catch (Exception e) {
-            result.setStatus(0);
-            result.setMsg("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
+            throw new ApiDetailException("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
         }
-        return result;
     }
 
     @Override
-    public ResponseResult shutdown(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public void shutdown(FlinkCluster cluster) {
+        FlinkCluster flinkCluster = this.getById(cluster.getId());
+        //1) check mode
         ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
         String clusterId = flinkCluster.getClusterId();
         KubernetesDeployParam kubernetesDeployParam = null;
@@ -235,15 +243,35 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
                     flinkCluster.getK8sRestExposedTypeEnum());
                 break;
             default:
-                result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
-                result.setStatus(0);
-                return result;
+                throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
         }
         if (StringUtils.isBlank(clusterId)) {
-            result.setMsg("the clusterId is Empty!");
-            result.setStatus(0);
-            return result;
+            throw new ApiAlertException("the clusterId can not be empty!");
         }
+
+        LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
+        updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
+
+        //2) check cluster is active
+        if (ExecutionMode.YARN_SESSION.equals(executionModeEnum) || ExecutionMode.REMOTE.equals(executionModeEnum)) {
+            if (ClusterState.STARTED.equals(ClusterState.of(flinkCluster.getClusterState()))) {
+                if (!flinkCluster.verifyClusterConnection()) {
+                    updateWrapper.set(FlinkCluster::getClusterState, ClusterState.LOST.getValue());
+                    update(updateWrapper);
+                    throw new ApiAlertException("current cluster is not active, please check");
+                }
+            } else {
+                throw new ApiAlertException("current cluster is not active, please check");
+            }
+        }
+
+        //3) check job if running on cluster
+        boolean existsRunningJob = applicationService.existsRunningJobByClusterId(flinkCluster.getId());
+        if (existsRunningJob) {
+            throw new ApiAlertException("some app is running on this cluster, the cluster cannot be shutdown");
+        }
+
+        //4) shutdown
         FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
         ShutDownRequest stopRequest = new ShutDownRequest(
             flinkEnv.getFlinkVersion(),
@@ -252,33 +280,27 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
             kubernetesDeployParam,
             flinkCluster.getProperties()
         );
-        LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
-        updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
+
         try {
             Future<ShutDownResponse> future = executorService.submit(() -> FlinkSubmitter.shutdown(stopRequest));
             ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
-            if (null != shutDownResponse) {
+            if (shutDownResponse != null) {
                 updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
                 update(updateWrapper);
-                result.setStatus(1);
-                return result;
+            } else {
+                throw new ApiAlertException("get shutdown response failed");
             }
-            result.setStatus(1);
-            result.setMsg("clusterId is not exists!");
-            return result;
         } catch (Exception e) {
             log.error(e.getMessage(), e);
             updateWrapper.set(FlinkCluster::getException, e.toString());
             update(updateWrapper);
-            result.setStatus(0);
-            result.setMsg("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-            return result;
+            throw new ApiDetailException("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
         }
     }
 
     @Override
-    public Boolean existsByClusterId(String clusterId) {
-        return this.baseMapper.existsByClusterId(clusterId);
+    public Boolean existsByClusterId(String clusterId, Long id) {
+        return this.baseMapper.existsByClusterId(clusterId, id);
     }
 
     @Override
@@ -287,35 +309,16 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     }
 
     @Override
-    public ResponseResult delete(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
-        if (StringUtils.isNoneBlank(flinkCluster.getClusterId())
-            && ClusterState.STARTED.equals(flinkCluster.getClusterStateEnum())
-            && !ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
-            result = shutdown(flinkCluster);
-            if (0 == result.getStatus()) {
-                return result;
-            }
+    public void delete(FlinkCluster cluster) {
+        Long id = cluster.getId();
+        FlinkCluster flinkCluster = getById(id);
+        if (flinkCluster == null) {
+            throw new ApiAlertException("flink cluster not exist, please check.");
         }
-        try {
-            removeById(flinkCluster.getId());
-            result.setStatus(1);
-        } catch (Exception e) {
-            result.setStatus(0);
-            result.setMsg("delete cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
+        if (applicationService.existsJobByClusterId(id)) {
+            throw new ApiAlertException("some app on this cluster, the cluster cannot be delete, please check.");
         }
-        return result;
+        removeById(id);
     }
 
-    private Map<String, Serializable> getFlameGraph(FlinkCluster flinkCluster) {
-        Map<String, Serializable> flameGraph = new HashMap<>(8);
-        flameGraph.put("reporter", "org.apache.streampark.plugin.profiling.reporter.HttpReporter");
-        flameGraph.put("type", ApplicationType.STREAMPARK_FLINK.getType());
-        flameGraph.put("id", flinkCluster.getId());
-        flameGraph.put("url", settingService.getStreamParkAddress().concat("/metrics/report"));
-        flameGraph.put("token", Utils.uuid());
-        flameGraph.put("sampleInterval", 1000 * 60 * 2);
-        flameGraph.put("metricInterval", 1000 * 60 * 2);
-        return flameGraph;
-    }
 }
diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 2d54fe758..5cf0f13f8 100644
--- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -409,7 +409,6 @@ create table if not exists `t_flink_cluster` (
   `dynamic_properties` text comment 'allows specifying multiple generic configuration options',
   `k8s_rest_exposed_type` tinyint default 2 comment 'k8s export(0:loadbalancer,1:clusterip,2:nodeport)',
   `k8s_hadoop_integration` tinyint default 0,
-  `flame_graph` tinyint default 0 comment 'flameGraph enable,default disable',
   `k8s_conf` varchar(255) default null comment 'the path where the k 8 s configuration file is located',
   `resolve_order` tinyint default null,
   `exception` text comment 'exception information',
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index d68bb17e6..0e3795da7 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -32,6 +32,7 @@
         <result column="app_id" jdbcType="VARCHAR" property="appId"/>
         <result column="version_id" jdbcType="BIGINT" property="versionId"/>
         <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
+        <result column="flink_cluster_id" jdbcType="BIGINT" property="flinkClusterId"/>
         <result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
         <result column="k8s_namespace" jdbcType="VARCHAR" property="k8sNamespace"/>
         <result column="app_type" jdbcType="INTEGER" property="appType"/>
@@ -98,6 +99,21 @@
         limit 1
     </select>
 
+    <select id="existsJobByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.Long">
+        select count(1)
+        from t_flink_app
+        where flink_cluster_id = #{clusterId}
+        limit 1
+    </select>
+
+    <select id="existsRunningJobByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.Long">
+        select count(1)
+        from t_flink_app
+        where flink_cluster_id = #{clusterId}
+        and state = 5
+        limit 1
+    </select>
+
     <select id="getByProjectId" resultType="org.apache.streampark.console.core.entity.Application" parameterType="java.lang.Long">
         select * from t_flink_app where project_id=#{projectId}
     </select>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 1e1a85c9d..c459c4b72 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -36,7 +36,6 @@
         <result column="dynamic_properties" jdbcType="LONGVARCHAR" property="dynamicProperties"/>
         <result column="k8s_rest_exposed_type" jdbcType="TINYINT" property="k8sRestExposedType"/>
         <result column="k8s_hadoop_integration" jdbcType="BOOLEAN" property="k8sHadoopIntegration"/>
-        <result column="flame_graph" jdbcType="BOOLEAN" property="flameGraph"/>
         <result column="k8s_conf" jdbcType="VARCHAR" property="k8sConf"/>
         <result column="resolve_order" jdbcType="INTEGER" property="resolveOrder"/>
         <result column="exception" jdbcType="LONGVARCHAR" property="exception"/>
@@ -47,7 +46,12 @@
     <select id="existsByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.String">
         select count(1)
         from t_flink_cluster
-        where cluster_id=#{clusterId}
+        <where>
+            cluster_id=#{clusterId}
+            <if test="id != null">
+                and id &lt;&gt; #{id}
+            </if>
+        </where>
         limit 1
     </select>
 
diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index b6abd4089..fb5f1777b 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -126,7 +126,6 @@ export interface AppListRecord {
   createTimeTo?: any;
   backUpDescription?: any;
   yarnQueue?: any;
-  yarnSessionClusterId?: any;
   teamIdList?: any;
   teamName: string;
   flinkRestUrl?: any;
@@ -178,5 +177,4 @@ export interface CreateParams {
   clusterId: string;
   flinkClusterId: string;
   flinkImage?: any;
-  yarnSessionClusterId?: any;
 }
diff --git a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index b1949498c..2fb05ea34 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -130,8 +130,10 @@ export enum ClusterStateEnum {
   CREATED = 0,
   /** cluster started */
   STARTED = 1,
-  /** cluster stopped */
-  STOPED = 2,
+  /** cluster canceled */
+  CANCELED = 2,
+  /** cluster lost */
+  LOST = 3,
 }
 
 export enum AppTypeEnum {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index b68483776..9c2907ce9 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -41,7 +41,6 @@ export default {
   podTemplate: 'Kubernetes Pod Template',
   flinkCluster: 'Flink Cluster',
   yarnQueue: 'Yarn Queue',
-  yarnSessionClusterId: 'Yarn Session ClusterId',
   mavenPom: 'Maven pom',
   uploadJar: 'Upload Jar',
   kubernetesNamespace: 'Kubernetes Namespace',
@@ -212,7 +211,6 @@ export default {
     tmPlaceholder: 'Please select the resource parameters to set',
     yarnQueuePlaceholder: 'Please enter yarn queue',
     descriptionPlaceholder: 'Please enter description for this application',
-    yarnSessionClusterIdPlaceholder: 'Please Select Yarn Session clusterId',
     kubernetesNamespacePlaceholder: 'Please enter kubernetes Namespace, e.g: default',
     kubernetesClusterIdPlaceholder: 'Please enter Kubernetes clusterId',
     kubernetesClusterIdIsRequiredMessage: 'Kubernetes clusterId is required',
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts
index 8e3ffd32d..6c16c7890 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts
@@ -89,7 +89,7 @@ export default {
     },
     required: {
       address: 'cluster address is required',
-      clusterId: 'Yarn Session ClusterId is required',
+      clusterId: 'Yarn Session Cluster is required',
     },
   },
   env: {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index e86789515..ed01275be 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -41,7 +41,6 @@ export default {
   podTemplate: 'Kubernetes Pod 模板',
   flinkCluster: 'Flink集群',
   yarnQueue: 'Yarn队列',
-  yarnSessionClusterId: 'Yarn session模式集群',
   mavenPom: 'maven pom',
   uploadJar: '上传依赖Jar文件',
   kubernetesNamespace: 'K8S命名空间',
@@ -207,7 +206,6 @@ export default {
     tmPlaceholder: '请选择要设置的资源参数',
     yarnQueuePlaceholder: '请输入yarn队列名称',
     descriptionPlaceholder: '请输入此应用程序的描述',
-    yarnSessionClusterIdPlaceholder: '请选择 Yarn Session 集群',
     kubernetesNamespacePlaceholder: '请输入K8S命名空间, 如: default',
     kubernetesClusterIdPlaceholder: '请选择K8S ClusterId',
     kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId必填',
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts
index b4107a5d1..800027de2 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts
@@ -88,7 +88,7 @@ export default {
     },
     required: {
       address: '必须填写集群地址',
-      clusterId: 'Yarn Session ClusterId 为必填项',
+      clusterId: 'Yarn Session Cluster 为必填项',
     },
   },
   env: {
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 9979c3220..0682c30c8 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -156,19 +156,10 @@
       unref(flinkClusters).filter((c) => {
         if (values.flinkClusterId) {
           return c.id == values.flinkClusterId && c.clusterState === ClusterStateEnum.STARTED;
-        } else {
-          return (
-            c.clusterId == values.yarnSessionClusterId &&
-            c.clusterState === ClusterStateEnum.STARTED
-          );
         }
       })[0] || null;
     if (cluster) {
-      Object.assign(values, {
-        clusterId: cluster.id,
-        flinkClusterId: cluster.id,
-        yarnSessionClusterId: cluster.clusterId,
-      });
+      Object.assign(values, { flinkClusterId: cluster.id });
     }
   }
 
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 33729ed3f..53925d714 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -104,7 +104,6 @@
         flinkImage: app.flinkImage,
         k8sNamespace: app.k8sNamespace,
         alertId: selectAlertId,
-        yarnSessionClusterId: app.yarnSessionClusterId,
         projectName: app.projectName,
         module: app.module,
         ...resetParams,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 21571fb7b..520167d53 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -48,7 +48,7 @@
   import { useGo } from '/@/hooks/web/usePage';
   import ProgramArgs from './components/ProgramArgs.vue';
   import VariableReview from './components/VariableReview.vue';
-  import { ClusterStateEnum, JobTypeEnum, UseStrategyEnum } from '/@/enums/flinkEnum';
+  import { JobTypeEnum, UseStrategyEnum } from '/@/enums/flinkEnum';
 
   const route = useRoute();
   const go = useGo();
@@ -77,7 +77,6 @@
     alerts,
     flinkEnvs,
     flinkSql,
-    flinkClusters,
     getEditStreamParkFormSchema,
     registerDifferentDrawer,
     suggestions,
@@ -124,7 +123,6 @@
         flinkClusterId: app.flinkClusterId,
         flinkImage: app.flinkImage,
         k8sNamespace: app.k8sNamespace,
-        yarnSessionClusterId: app.yarnSessionClusterId,
         ...resetParams,
       };
       console.log('resetParams', resetParams);
@@ -195,17 +193,6 @@
           jar: unref(uploadJars),
         });
       }
-      if (values.yarnSessionClusterId) {
-        const cluster =
-          flinkClusters.value.filter(
-            (c) =>
-              c.clusterId === values.yarnSessionClusterId &&
-              c.clusterState === ClusterStateEnum.STARTED,
-          )[0] || null;
-        values.clusterId = cluster.id;
-        values.flinkClusterId = cluster.id;
-        values.yarnSessionClusterId = cluster.clusterId;
-      }
       let config = values.configOverride;
       if (config != null && config.trim() !== '') {
         config = encryptByBase64(config);
@@ -245,18 +232,6 @@
       } else {
         config = null;
       }
-      if (values.yarnSessionClusterId) {
-        const cluster =
-          flinkClusters.value.filter((c) => {
-            return (
-              c.clusterId === values.yarnSessionClusterId &&
-              c.clusterState === ClusterStateEnum.STARTED
-            );
-          })[0] || null;
-        values.clusterId = cluster.id;
-        values.flinkClusterId = cluster.id;
-        values.yarnSessionClusterId = cluster.clusterId;
-      }
       const configId = values.strategy == UseStrategyEnum.USE_EXIST ? app.configId : null;
       const params = {
         id: app.id,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 96fa616d5..b1610042a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -221,14 +221,12 @@ export const useCreateAndEditSchema = (
         rules: [{ required: true, message: 'Flink Cluster is required' }],
       },
       {
-        field: 'yarnSessionClusterId',
-        label: t('flink.app.yarnSessionClusterId'),
+        field: 'flinkClusterId',
+        label: t('flink.app.flinkCluster'),
         component: 'Select',
-        componentProps: () => {
-          return {
-            placeholder: t('flink.app.addAppTips.yarnSessionClusterIdPlaceholder'),
-            options: getExecutionCluster(ExecModeEnum.YARN_SESSION, 'clusterId'),
-          };
+        componentProps: {
+          placeholder: t('flink.app.flinkCluster'),
+          options: getExecutionCluster(ExecModeEnum.YARN_SESSION, 'id'),
         },
         ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
         rules: [{ required: true, message: 'Flink Cluster is required' }],
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts
index 920e166e8..76f81eec9 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts
@@ -74,25 +74,25 @@ export const useFlinkSchema = (editModel?: string) => {
         field: 'flinkClusterId',
         label: t('flink.app.flinkCluster'),
         component: 'Select',
-        componentProps: {
-          placeholder: t('flink.app.flinkCluster'),
-          options: (getExecutionCluster(ExecModeEnum.REMOTE) || []).map((i) => ({
-            label: i.clusterName,
-            value: i.id,
-          })),
+        componentProps: () => {
+          const options = getExecutionCluster(ExecModeEnum.REMOTE);
+          return {
+            placeholder: t('flink.app.flinkCluster'),
+            options: options.map((i) => ({ label: i.clusterName, value: i.id })),
+          };
         },
         ifShow: ({ values }) => values.executionMode == ExecModeEnum.REMOTE,
         rules: [{ required: true, message: 'Flink Cluster is required' }],
       },
       {
-        field: 'yarnSessionClusterId',
-        label: t('flink.app.yarnSessionClusterId'),
+        field: 'flinkClusterId',
+        label: t('flink.app.flinkCluster'),
         component: 'Select',
         componentProps: () => {
           const options = getExecutionCluster(ExecModeEnum.YARN_SESSION);
           return {
-            placeholder: t('flink.app.addAppTips.yarnSessionClusterIdPlaceholder'),
-            options: options.map((i) => ({ label: i.clusterName, value: i.clusterId })),
+            placeholder: t('flink.app.flinkCluster'),
+            options: options.map((i) => ({ label: i.clusterName, value: i.id })),
           };
         },
         ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 21d5d1944..25097521a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -271,7 +271,6 @@ export function handleSubmitParams(
     clusterId: values.clusterId || null,
     flinkClusterId: values.flinkClusterId || null,
     flinkImage: values.flinkImage || null,
-    yarnSessionClusterId: values.yarnSessionClusterId || null,
   });
   if (params.executionMode == ExecModeEnum.KUBERNETES_APPLICATION) {
     Object.assign(params, {
@@ -291,7 +290,3 @@ export const filterOption = (input: string, options: Recordable) => {
 export function isK8sExecMode(mode: number): boolean {
   return [ExecModeEnum.KUBERNETES_SESSION, ExecModeEnum.KUBERNETES_APPLICATION].includes(mode);
 }
-// session mode
-export function isSessionMode(mode: number): boolean {
-  return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode);
-}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
index 0ee7c4afb..3a001023e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
@@ -50,9 +50,10 @@
       const params = handleSubmitParams(values);
       if (Object.keys(params).length > 0) {
         const res = await fetchCheckCluster(params);
-        if (res === 'success') {
+        const status = parseInt(res.status);
+        if (status === 0) {
           const resp = await fetchCreateCluster(params);
-          if (resp.status) {
+          if (resp) {
             Swal.fire({
               icon: 'success',
               title: values.clusterName.concat(' create successful!'),
@@ -61,20 +62,10 @@
             });
             go('/flink/setting?activeKey=cluster');
           } else {
-            Swal.fire(resp.msg);
+            Swal.fire('Failed', 'create cluster failed, please check log', 'error');
           }
-        } else if (res === 'exists') {
-          Swal.fire(
-            'Failed',
-            'the cluster name: ' + values.clusterName + ' is already exists,please check',
-            'error',
-          );
         } else {
-          Swal.fire(
-            'Failed',
-            'the address is invalid or connection failure, please check',
-            'error',
-          );
+          Swal.fire('Failed', res.msg, 'error');
         }
       }
     } catch (error) {
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
index 0b58d3842..fc1c11676 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
@@ -65,31 +65,18 @@
           id: cluster.id,
         });
         const res = await fetchCheckCluster(params);
-        if (res === 'success') {
-          const resp = await fetchUpdateCluster(params);
-          if (resp.status) {
-            Swal.fire({
-              icon: 'success',
-              title: values.clusterName.concat(' update successful!'),
-              showConfirmButton: false,
-              timer: 2000,
-            });
-            go('/flink/setting?activeKey=cluster');
-          } else {
-            Swal.fire(resp.data.msg);
-          }
-        } else if (res === 'exists') {
-          Swal.fire(
-            'Failed',
-            'the cluster name: ' + values.clusterName + ' is already exists,please check',
-            'error',
-          );
+        const status = parseInt(res.status);
+        if (status === 0) {
+          fetchUpdateCluster(params);
+          Swal.fire({
+            icon: 'success',
+            title: values.clusterName.concat(' update successful!'),
+            showConfirmButton: false,
+            timer: 2000,
+          });
+          go('/flink/setting?activeKey=cluster');
         } else {
-          Swal.fire(
-            'Failed',
-            'the address is invalid or connection failure, please check',
-            'error',
-          );
+          Swal.fire('Failed', res.msg, 'error');
         }
       }
     } catch (error) {
@@ -122,7 +109,6 @@
         flinkImage: cluster.flinkImage,
         serviceAccount: cluster.serviceAccount,
         k8sConf: cluster.k8sConf,
-        flameGraph: cluster.flameGraph,
         k8sNamespace: cluster.k8sNamespace,
         ...resetParams,
       });
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue
index c9ee6b867..0d290255a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue
@@ -16,7 +16,6 @@
 -->
 <script lang="ts">
   import { defineComponent } from 'vue';
-  import { exceptionPropWidth } from '/@/utils';
   import { ClusterStateEnum, ExecModeEnum } from '/@/enums/flinkEnum';
   export default defineComponent({
     name: 'FlinkClusterSetting',
@@ -52,73 +51,28 @@
   const { t } = useI18n();
   const { Swal, createMessage } = useMessage();
   const clusters = ref<FlinkCluster[]>([]);
-  const optionClusters = {
-    starting: new Map(),
-    created: new Map(),
-    stoped: new Map(),
-  };
   function isSessionMode(mode: number): boolean {
     return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode);
   }
 
-  /* Get flink environmental data*/
-  async function getFlinkClusterSetting() {
-    const clusterList = await fetchFlinkCluster();
-    clusters.value = clusterList;
-    for (const key in clusterList) {
-      const cluster = clusterList[key];
-      if (cluster.clusterState === ClusterStateEnum.CREATED) {
-        optionClusters.created.set(cluster.id, new Date().getTime());
-      } else if (cluster.clusterState === ClusterStateEnum.STARTED) {
-        optionClusters.starting.set(cluster.id, new Date().getTime());
-      } else {
-        optionClusters.stoped.set(cluster.id, new Date().getTime());
-      }
-    }
-  }
-
   function handleIsStart(item) {
-    /**
-     The cluster was just created but not started
-     CREATED(0),
-     cluster started
-     STARTED(1),
-     cluster stopped
-     STOPED(2);
-    */
-    return optionClusters.starting.get(item.id);
+    return item.clusterState === ClusterStateEnum.STARTED;
   }
   /* Go to edit cluster */
   function handleEditCluster(item: FlinkCluster) {
     go(`/flink/setting/edit_cluster?clusterId=${item.id}`);
   }
   /* deploy */
-  async function handleDeployCluser(item: FlinkCluster) {
+  async function handleDeployCluster(item: FlinkCluster) {
     const hide = createMessage.loading('The current cluster is starting', 0);
     try {
-      const { data } = await fetchClusterStart(item.id);
-      if (data?.data?.status) {
-        // optionClusters.starting.set(item.id, new Date().getTime());
-        handleMapUpdate('starting');
-        getFlinkClusterSetting();
-        Swal.fire({
-          icon: 'success',
-          title: 'The current cluster is started',
-          showConfirmButton: false,
-          timer: 2000,
-        });
-      } else {
-        Swal.fire({
-          title: 'Failed',
-          icon: 'error',
-          width: exceptionPropWidth(),
-          html: '<pre class="propsException">' + data?.data?.msg + '</pre>',
-          showCancelButton: true,
-          confirmButtonColor: '#55BDDDFF',
-          confirmButtonText: 'OK',
-          cancelButtonText: 'Close',
-        });
-      }
+      await fetchClusterStart(item.id);
+      await Swal.fire({
+        icon: 'success',
+        title: 'The current cluster is started',
+        showConfirmButton: false,
+        timer: 2000,
+      });
     } catch (error) {
       console.error(error);
     } finally {
@@ -127,36 +81,15 @@
   }
   /* delete */
   async function handleDelete(item: FlinkCluster) {
-    const { data } = await fetchClusterRemove(item.id);
-    if (data?.data?.status) {
-      // optionClusters.starting.delete(item.id);
-      handleMapUpdate('starting');
-      getFlinkClusterSetting();
-      createMessage.success('The current cluster is remove');
-    }
+    await fetchClusterRemove(item.id);
+    createMessage.success('The current cluster is remove');
   }
   /* shutdown */
   async function handleShutdownCluster(item: FlinkCluster) {
     const hide = createMessage.loading('The current cluster is canceling', 0);
     try {
-      const { data } = await fetchClusterShutdown(item.id);
-      if (data?.data?.status) {
-        // optionClusters.starting.delete(item.id);
-        handleMapUpdate('starting');
-        getFlinkClusterSetting();
-        createMessage.success('The current cluster is shutdown');
-      } else {
-        Swal.fire({
-          title: 'Failed',
-          icon: 'error',
-          width: exceptionPropWidth(),
-          html: '<pre class="propsException">' + data.data.msg + '</pre>',
-          showCancelButton: true,
-          confirmButtonColor: '#55BDDDFF',
-          confirmButtonText: 'OK',
-          cancelButtonText: 'Close',
-        });
-      }
+      await fetchClusterShutdown(item.id);
+      createMessage.success('The current cluster is shutdown');
     } catch (error) {
       console.error(error);
     } finally {
@@ -164,13 +97,14 @@
     }
   }
 
-  function handleMapUpdate(type: string) {
-    const map = optionClusters[type];
-    optionClusters[type] = new Map(map);
+  async function getFlinkCluster() {
+    const clusterList = await fetchFlinkCluster();
+    clusters.value = clusterList;
   }
 
   onMounted(() => {
-    getFlinkClusterSetting();
+    getFlinkCluster();
+    setInterval(() => getFlinkCluster(), 1000 * 3);
   });
 </script>
 <template>
@@ -218,19 +152,8 @@
       <template #actions>
         <Tooltip :title="t('flink.setting.cluster.edit')">
           <a-button
-            v-if="handleIsStart(item) && item.executionMode == ExecModeEnum.YARN_SESSION"
-            v-auth="'cluster:update'"
-            :disabled="true"
-            @click="handleEditCluster(item)"
-            shape="circle"
-            size="large"
-            class="control-button"
-          >
-            <EditOutlined />
-          </a-button>
-          <a-button
-            v-if="!handleIsStart(item) || item.executionMode == ExecModeEnum.REMOTE"
             v-auth="'cluster:update'"
+            :disabled="handleIsStart(item)"
             @click="handleEditCluster(item)"
             shape="circle"
             size="large"
@@ -239,40 +162,10 @@
             <EditOutlined />
           </a-button>
         </Tooltip>
-        <template v-if="!handleIsStart(item)">
-          <Tooltip :title="t('flink.setting.cluster.start')">
-            <a-button
-              v-if="isSessionMode(item.executionMode)"
-              v-auth="'cluster:create'"
-              @click="handleDeployCluser(item)"
-              shape="circle"
-              size="large"
-              class="control-button"
-            >
-              <PlayCircleOutlined />
-            </a-button>
-            <a-button
-              v-else
-              :disabled="true"
-              v-auth="'cluster:create'"
-              shape="circle"
-              size="large"
-              style="margin-left: 3px"
-              class="control-button"
-            >
-              <PlayCircleOutlined />
-            </a-button>
-          </Tooltip>
-        </template>
-
-        <template v-else>
+        <template v-if="handleIsStart(item)">
           <Tooltip :title="t('flink.setting.cluster.stop')">
             <a-button
-              v-if="
-                [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(
-                  item.executionMode,
-                )
-              "
+              :disabled="item.executionMode === ExecModeEnum.REMOTE"
               v-auth="'cluster:create'"
               @click="handleShutdownCluster(item)"
               shape="circle"
@@ -282,38 +175,31 @@
             >
               <PauseCircleOutlined />
             </a-button>
+          </Tooltip>
+        </template>
+        <template v-else>
+          <Tooltip :title="t('flink.setting.cluster.start')">
             <a-button
-              v-else
-              :disabled="true"
+              :disabled="!isSessionMode(item.executionMode)"
               v-auth="'cluster:create'"
+              @click="handleDeployCluster(item)"
               shape="circle"
               size="large"
               class="control-button"
             >
-              <PauseCircleOutlined />
+              <PlayCircleOutlined />
             </a-button>
           </Tooltip>
         </template>
-
         <Tooltip :title="t('flink.setting.cluster.detail')">
           <a-button
-            v-if="!handleIsStart(item)"
+            :disabled="!handleIsStart(item)"
             v-auth="'app:detail'"
-            :disabled="true"
             shape="circle"
-            size="large"
-            class="control-button"
-          >
-            <EyeOutlined />
-          </a-button>
-          <a-button
-            v-else
-            v-auth="'app:detail'"
-            shape="circle"
-            size="large"
-            class="control-button"
             :href="item.address"
             target="_blank"
+            size="large"
+            class="control-button"
           >
             <EyeOutlined />
           </a-button>
@@ -325,7 +211,13 @@
           :ok-text="t('common.yes')"
           @confirm="handleDelete(item)"
         >
-          <a-button type="danger" shape="circle" size="large" class="control-button">
+          <a-button
+            :disabled="item.clusterState === ClusterStateEnum.STARTED"
+            type="danger"
+            shape="circle"
+            size="large"
+            class="control-button"
+          >
             <DeleteOutlined />
           </a-button>
         </Popconfirm>
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
index 77de7dca3..0cd2a5ab3 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
@@ -34,7 +34,7 @@ import {
   fetchK8sNamespaces,
   fetchSessionClusterIds,
 } from '/@/api/flink/app/flinkHistory';
-import { handleFormValue, isSessionMode } from '../../app/utils';
+import { handleFormValue } from '../../app/utils';
 import { useMessage } from '/@/hooks/web/useMessage';
 import { ClusterAddTypeEnum } from '/@/enums/appEnum';
 import { useI18n } from '/@/hooks/web/useI18n';
@@ -89,6 +89,22 @@ export const useClusterSetting = () => {
       }
     }
   }
+
+  function isAddExistYarnSession(value: Recordable) {
+    return (
+      value.executionMode == ExecModeEnum.YARN_SESSION &&
+      value.addType == ClusterAddTypeEnum.ADD_EXISTING
+    );
+  }
+
+  // session mode
+  function isShowInSessionMode(value: Recordable): boolean {
+    if (value.executionMode == ExecModeEnum.YARN_SESSION) {
+      return value.addType == ClusterAddTypeEnum.ADD_NEW;
+    }
+    return value.executionMode == ExecModeEnum.KUBERNETES_SESSION;
+  }
+
   const getClusterSchema = computed((): FormSchema[] => {
     return [
       {
@@ -127,18 +143,10 @@ export const useClusterSetting = () => {
         },
         rules: [{ required: true, message: 'Flink Version is required' }],
       },
-      {
-        field: 'yarnQueue',
-        label: 'Yarn Queue',
-        component: 'Input',
-        componentProps: {
-          placeholder: 'Please enter yarn queue',
-        },
-        ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
-      },
       {
         field: 'addType',
         label: t('flink.setting.cluster.form.addType'),
+        ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
         component: 'Select',
         defaultValue: ClusterAddTypeEnum.ADD_EXISTING,
         componentProps: {
@@ -156,6 +164,17 @@ export const useClusterSetting = () => {
           ],
         },
       },
+      {
+        field: 'yarnQueue',
+        label: 'Yarn Queue',
+        component: 'Input',
+        componentProps: {
+          placeholder: 'Please enter yarn queue',
+        },
+        ifShow: ({ values }) =>
+          values.executionMode == ExecModeEnum.YARN_SESSION &&
+          values.addType == ClusterAddTypeEnum.ADD_NEW,
+      },
       {
         field: 'address',
         label: 'Address',
@@ -168,19 +187,18 @@ export const useClusterSetting = () => {
                 : 'Please enter cluster address,  e.g: http://host:port',
           };
         },
-        ifShow: ({ values }) => values.addType == ClusterAddTypeEnum.ADD_EXISTING,
+        ifShow: ({ values }) =>
+          isAddExistYarnSession(values) || values.executionMode == ExecModeEnum.REMOTE,
         rules: [{ required: true, message: t('flink.setting.cluster.required.address') }],
       },
       {
         field: 'clusterId',
-        label: 'Yarn Session ClusterId',
+        label: 'Yarn Session Cluster',
         component: 'Input',
         componentProps: {
-          placeholder: 'Please enter Yarn Session clusterId',
+          placeholder: 'Please enter Yarn Session cluster',
         },
-        ifShow: ({ values }) =>
-          values.addType == ClusterAddTypeEnum.ADD_EXISTING &&
-          values.executionMode == ExecModeEnum.YARN_SESSION,
+        ifShow: ({ values }) => isAddExistYarnSession(values),
         rules: [{ required: true, message: t('flink.setting.cluster.required.clusterId') }],
       },
       {
@@ -254,15 +272,15 @@ export const useClusterSetting = () => {
       {
         field: 'resolveOrder',
         label: 'Resolve Order',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: { placeholder: 'classloader.resolve-order', options: resolveOrder },
-        rules: [{ required: true, message: 'Resolve Order is required', type: 'number' }],
+        rules: [{ message: 'Resolve Order is required', type: 'number' }],
       },
       {
         field: 'slot',
         label: 'Task Slots',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'InputNumber',
         componentProps: {
           placeholder: 'Number of slots per TaskManager',
@@ -274,14 +292,14 @@ export const useClusterSetting = () => {
       {
         field: 'totalOptions',
         label: 'Total Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         render: (renderCallbackParams) => renderTotalMemory(renderCallbackParams),
       },
       {
         field: 'totalItem',
         label: 'totalItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'totalOptions', field, '.memory', true),
@@ -289,7 +307,7 @@ export const useClusterSetting = () => {
       {
         field: 'jmOptions',
         label: 'JM Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: {
           showSearch: true,
@@ -304,7 +322,7 @@ export const useClusterSetting = () => {
       {
         field: 'jmOptionsItem',
         label: 'jmOptionsItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'jmOptions', field, 'jobmanager.memory.'),
@@ -312,7 +330,7 @@ export const useClusterSetting = () => {
       {
         field: 'tmOptions',
         label: 'TM Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: {
           showSearch: true,
@@ -327,7 +345,7 @@ export const useClusterSetting = () => {
       {
         field: 'tmOptionsItem',
         label: 'tmOptionsItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'tmOptions', field, 'taskmanager.memory.'),
@@ -335,7 +353,7 @@ export const useClusterSetting = () => {
       {
         field: 'dynamicProperties',
         label: 'Dynamic Properties',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Input',
         render: (renderCallbackParams) => renderDynamicProperties(renderCallbackParams),
       },
@@ -353,7 +371,6 @@ export const useClusterSetting = () => {
   function handleSubmitParams(values: Recordable) {
     const options = handleFormValue(values);
     const params = {
-      clusterId: values.clusterId || null,
       clusterName: values.clusterName,
       executionMode: values.executionMode,
       versionId: values.versionId,
@@ -366,17 +383,23 @@ export const useClusterSetting = () => {
         });
         return params;
       case ExecModeEnum.YARN_SESSION:
-        Object.assign(params, {
-          options: JSON.stringify(options),
-          yarnQueue: values.yarnQueue || 'default',
-          dynamicProperties: values.dynamicProperties,
-          resolveOrder: values.resolveOrder,
-          address: values.address,
-          flameGraph: values.flameGraph,
-        });
+        if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) {
+          Object.assign(params, {
+            clusterId: values.clusterId,
+            address: values.address,
+          });
+        } else {
+          Object.assign(params, {
+            options: JSON.stringify(options),
+            yarnQueue: values.yarnQueue || 'default',
+            dynamicProperties: values.dynamicProperties,
+            resolveOrder: values.resolveOrder,
+          });
+        }
         return params;
       case ExecModeEnum.KUBERNETES_SESSION:
         Object.assign(params, {
+          clusterId: values.clusterId,
           options: JSON.stringify(options),
           dynamicProperties: values.dynamicProperties,
           resolveOrder: values.resolveOrder,
@@ -386,7 +409,6 @@ export const useClusterSetting = () => {
           k8sConf: values.k8sConf,
           flinkImage: values.flinkImage || null,
           address: values.address,
-          flameGraph: values.flameGraph,
         });
         return params;
       default:
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
index 278d46758..264653d49 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 case class DeployRequest(flinkVersion: FlinkVersion,
                          clusterId: String,
                          executionMode: ExecutionMode,
-                         flameGraph: JavaMap[String, java.io.Serializable],
                          properties: JavaMap[String, Any],
                          @Nullable k8sDeployParam: KubernetesDeployParam) {
 
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
index f352732da..800de3059 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
@@ -17,6 +17,4 @@
 
 package org.apache.streampark.flink.submit.bean
 
-case class DeployResponse(address: String,
-                          clusterId: String,
-                          message: String = null)
+case class DeployResponse(address: String, clusterId: String)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index cc701f310..84cc59363 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -88,5 +88,4 @@ object KubernetesNativeApplicationSubmit extends KubernetesNativeSubmitTrait {
     super.doCancel(cancelRequest, flinkConfig)
   }
 
-
 }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
index 9e91914ff..9872d082a 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
@@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
          |    exposedType      : ${deployRequest.k8sDeployParam.flinkRestExposedType}
          |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
          |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
-         |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          |-------------------------------------------------------------------------------------------
          |""".stripMargin)
@@ -158,7 +157,7 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
         client = clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
       if (client.getWebInterfaceURL != null) {
-        DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+        DeployResponse(client.getWebInterfaceURL, client.getClusterId)
       } else {
         null
       }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala
index 5022e6ec1..7cf8a7d43 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala
@@ -50,8 +50,7 @@ object LocalSubmit extends FlinkSubmitTrait {
       val jobGraph = packageProgramJobGraph._2
       client = createLocalCluster(flinkConfig)
       val jobId = client.submitJob(jobGraph).get().toString
-      val result = SubmitResponse(jobId, flinkConfig.toMap, jobId)
-      result
+      SubmitResponse(jobId, flinkConfig.toMap, jobId)
     } catch {
       case e: Exception =>
         logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
index 7c87a4ae7..e8a84264f 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
@@ -30,7 +30,6 @@ import java.io.File
 import java.lang.{Integer => JavaInt}
 import scala.util.{Failure, Success, Try}
 
-
 /**
  * Submit Job to Remote Cluster
  */
@@ -66,7 +65,7 @@ object RemoteSubmit extends FlinkSubmitTrait {
     try {
       client = standAloneDescriptor._2.retrieve(standAloneDescriptor._1).getClusterClient
       val jobID = JobID.fromHexString(cancelRequest.jobId)
-      val actionResult = cancelJob(cancelRequest, jobID, client)
+      val actionResult = super.cancelJob(cancelRequest, jobID, client)
       CancelResponse(actionResult)
     } catch {
       case e: Exception =>
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 21b216e77..3fa415c46 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -17,7 +17,6 @@
 
 package org.apache.streampark.flink.submit.impl
 
-import org.apache.streampark.common.conf.ConfigConst.KEY_YARN_APP_ID
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.submit.`trait`.YarnSubmitTrait
 import org.apache.streampark.flink.submit.bean._
@@ -49,7 +48,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
-
     logInfo(
       s"""
          |------------------------------------------------------------------
@@ -137,8 +135,9 @@ object YarnSessionSubmit extends YarnSubmitTrait {
   }
 
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
-    flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.properties.get(KEY_YARN_APP_ID).toString)
-    flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+    flinkConfig
+      .safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.clusterId)
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
     logInfo(
       s"""
          |------------------------------------------------------------------
@@ -153,7 +152,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
       clusterDescriptor = yarnClusterDescriptor._2
       client = clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
       val jobID = JobID.fromHexString(cancelRequest.jobId)
-      val actionResult = cancelJob(cancelRequest, jobID, client)
+      val actionResult = super.cancelJob(cancelRequest, jobID, client)
       CancelResponse(actionResult)
     } catch {
       case e: Exception => logError(s"stop flink yarn session job fail")
@@ -172,7 +171,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
          |    clusterId        : ${deployRequest.clusterId}
-         |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          |-------------------------------------------------------------------------------------------
          |""".stripMargin)
@@ -230,7 +228,8 @@ object YarnSessionSubmit extends YarnSubmitTrait {
       clusterDescriptor = yarnClusterDescriptor._2
       if (FinalApplicationStatus.UNDEFINED.equals(clusterDescriptor.getYarnClient.getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId)).getFinalApplicationStatus)) {
         val clientProvider = clusterDescriptor.retrieve(yarnClusterDescriptor._1)
-        clientProvider.getClusterClient.shutDownCluster()
+        client = clientProvider.getClusterClient
+        client.shutDownCluster()
       }
       logInfo(s"the ${shutDownRequest.clusterId}'s final status is ${clusterDescriptor.getYarnClient.getApplicationReport(ConverterUtils.toApplicationId(shutDownRequest.clusterId)).getFinalApplicationStatus}")
       ShutDownResponse()
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala
index 7e7d85e82..a9c3a57f4 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala
@@ -95,10 +95,9 @@ trait KubernetesNativeSubmitTrait extends FlinkSubmitTrait {
       clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
       client = clusterDescriptor.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)).getClusterClient
       val jobID = JobID.fromHexString(cancelRequest.jobId)
-      val actionResult = cancelJob(cancelRequest, jobID, client)
+      val actionResult = super.cancelJob(cancelRequest, jobID, client)
       IngressController.deleteIngress(cancelRequest.clusterId, cancelRequest.kubernetesNamespace)
       CancelResponse(actionResult)
-
     } catch {
       case e: Exception =>
         logger.error(s"[flink-submit] stop flink job failed, mode=${flinkConfig.get(DeploymentOptions.TARGET)}, cancelRequest=${cancelRequest}")
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala
index 1735f6bf9..c667b15fe 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala
@@ -37,9 +37,7 @@ import scala.util.Try
 trait YarnSubmitTrait extends FlinkSubmitTrait {
 
   override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = {
-
     val jobID = getJobID(cancelRequest.jobId)
-
     val clusterClient = {
       flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.clusterId)
       val clusterClientFactory = new YarnClusterClientFactory
@@ -51,7 +49,7 @@ trait YarnSubmitTrait extends FlinkSubmitTrait {
       clusterDescriptor.retrieve(applicationId).getClusterClient
     }
     Try {
-      val savepointDir = cancelJob(cancelRequest, jobID, clusterClient)
+      val savepointDir = super.cancelJob(cancelRequest, jobID, clusterClient)
       CancelResponse(savepointDir)
     }.recover {
       case e => throw new FlinkException(s"[StreamPark] Triggering a savepoint for the job ${cancelRequest.jobId} failed. detail: ${ExceptionUtils.stringifyException(e)}");