You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/25 18:04:36 UTC

[incubator-streampark] 01/02: [Bug] flink yarn session cluster bug fixed

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch session-cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit d2f67fd60afe82ff65d5cea3ee2e93d0dd95dbc2
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 26 02:02:31 2022 +0800

    [Bug] flink yarn session cluster bug fixed
---
 .../core/controller/FlinkClusterController.java    |  3 +-
 .../console/core/entity/Application.java           |  9 ++-
 .../console/core/entity/FlinkCluster.java          | 46 +++++++++--
 .../console/core/mapper/FlinkClusterMapper.java    |  2 +-
 .../console/core/service/FlinkClusterService.java  |  4 +-
 .../core/service/impl/FlinkClusterServiceImpl.java | 93 +++++++++++-----------
 .../resources/mapper/core/FlinkClusterMapper.xml   |  7 +-
 .../src/views/flink/app/utils/index.ts             |  4 -
 .../src/views/flink/setting/AddCluster.vue         | 15 +---
 .../src/views/flink/setting/EditCluster.vue        | 16 +---
 .../views/flink/setting/hooks/useClusterSetting.ts | 85 ++++++++++++--------
 .../flink/submit/bean/DeployRequest.scala          |  1 -
 .../impl/KubernetesNativeSessionSubmit.scala       |  3 +-
 .../flink/submit/impl/YarnSessionSubmit.scala      |  1 -
 14 files changed, 160 insertions(+), 129 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 5927012e5..2482ad13c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -57,7 +57,7 @@ public class FlinkClusterController {
 
     @PostMapping("check")
     public RestResponse check(FlinkCluster cluster) {
-        String checkResult = flinkClusterService.check(cluster);
+        ResponseResult checkResult = flinkClusterService.check(cluster);
         return RestResponse.success(checkResult);
     }
 
@@ -78,7 +78,6 @@ public class FlinkClusterController {
         flinkCluster.setAddress(cluster.getAddress());
         flinkCluster.setExecutionMode(cluster.getExecutionMode());
         flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
-        flinkCluster.setFlameGraph(cluster.getFlameGraph());
         flinkCluster.setFlinkImage(cluster.getFlinkImage());
         flinkCluster.setOptions(cluster.getOptions());
         flinkCluster.setYarnQueue(cluster.getYarnQueue());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 8be656d51..7765869bd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -482,9 +482,12 @@ public class Application implements Serializable {
     @SneakyThrows
     @SuppressWarnings("unchecked")
     public Map<String, Object> getOptionMap() {
-        Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
-        map.entrySet().removeIf(entry -> entry.getValue() == null);
-        return map;
+        if (StringUtils.isNotEmpty(this.options)) {
+            Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
+            map.entrySet().removeIf(entry -> entry.getValue() == null);
+            return map;
+        }
+        return Collections.emptyMap();
     }
 
     @JsonIgnore
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 9ee264226..965bfbda4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.flink.submit.FlinkSubmitter;
 
 import com.baomidou.mybatisplus.annotation.IdType;
@@ -35,12 +36,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.Data;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.http.client.config.RequestConfig;
 
 import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -86,8 +89,6 @@ public class FlinkCluster implements Serializable {
 
     private Integer k8sRestExposedType;
 
-    private Boolean flameGraph;
-
     private String k8sConf;
 
     private Integer resolveOrder;
@@ -115,13 +116,18 @@ public class FlinkCluster implements Serializable {
     @JsonIgnore
     @SneakyThrows
     public Map<String, Object> getOptionMap() {
-        Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
-        if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
-            map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
-            map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+        if (StringUtils.isNotEmpty(this.options)) {
+            Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
+            if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
+                map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
+                if (StringUtils.isNotEmpty(this.yarnQueue)) {
+                    map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+                }
+            }
+            map.entrySet().removeIf(entry -> entry.getValue() == null);
+            return map;
         }
-        map.entrySet().removeIf(entry -> entry.getValue() == null);
-        return map;
+        return Collections.emptyMap();
     }
 
     @JsonIgnore
@@ -159,6 +165,30 @@ public class FlinkCluster implements Serializable {
         return false;
     }
 
+    public boolean verifyFlinkYarnCluster() {
+        if (address == null) {
+            return false;
+        }
+        String[] array = address.split(",");
+        for (String url : array) {
+            try {
+                new URI(url);
+            } catch (Exception ignored) {
+                return false;
+            }
+            try {
+                String format = "%s/proxy/%s/overview";
+                String yarnSessionUrl = String.format(format, url, this.clusterId);
+                String result = HttpClientUtils.httpGetRequest(yarnSessionUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                JacksonUtils.read(result, Overview.class);
+                return true;
+            } catch (Exception ignored) {
+                //
+            }
+        }
+        return false;
+    }
+
     @JsonIgnore
     public Map<String, String> getFlinkConfig() throws MalformedURLException, JsonProcessingException {
         URI activeAddress = this.getActiveAddress();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
index 428aa8e99..d9e9b6558 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
@@ -24,7 +24,7 @@ import org.apache.ibatis.annotations.Param;
 
 public interface FlinkClusterMapper extends BaseMapper<FlinkCluster> {
 
-    Boolean existsByClusterId(@Param("clusterId") String clusterId);
+    Boolean existsByClusterId(@Param("clusterId") String clusterId, @Param("id") Long id);
 
     Boolean existsByClusterName(@Param("clusterName") String clusterName, @Param("id") Long id);
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index a2c525441..2be56cb3e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -24,7 +24,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
 
 public interface FlinkClusterService extends IService<FlinkCluster> {
 
-    String check(FlinkCluster flinkCluster);
+    ResponseResult check(FlinkCluster flinkCluster);
 
     ResponseResult create(FlinkCluster flinkCluster);
 
@@ -36,7 +36,7 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
 
     ResponseResult shutdown(FlinkCluster flinkCluster);
 
-    Boolean existsByClusterId(String clusterId);
+    Boolean existsByClusterId(String clusterId, Long id);
 
     Boolean existsByClusterName(String clusterName, Long id);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index da14fa90a..8217aee40 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -17,11 +17,10 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -49,10 +48,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.io.Serializable;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -84,44 +80,60 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     private SettingService settingService;
 
     @Override
-    public String check(FlinkCluster cluster) {
-        if (null == cluster.getClusterName() || null == cluster.getExecutionMode()) {
-            return "error";
-        }
-        //1) Check if name is duplicate, if it already exists
+    public ResponseResult check(FlinkCluster cluster) {
+        ResponseResult result = new ResponseResult();
+        result.setStatus(0);
+
+        //1) Check name is already exists
         Boolean existsByClusterName = this.existsByClusterName(cluster.getClusterName(), cluster.getId());
         if (existsByClusterName) {
-            return "exists";
+            result.setMsg("clusterName is already exists,please check!");
+            result.setStatus(1);
+            return result;
+        }
+
+        //2) Check target-cluster is already exists
+        String clusterId = cluster.getClusterId();
+        if (StringUtils.isNotEmpty(clusterId)) {
+            Boolean existsByClusterId = this.existsByClusterId(clusterId, cluster.getId());
+            if (existsByClusterId) {
+                result.setMsg("the clusterId " + clusterId + " is already exists,please check!");
+                result.setStatus(2);
+                return result;
+            }
         }
 
+        // 3) Check connection
         if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
-            //2) Check if the connection can be made to
-            return cluster.verifyConnection() ? "success" : "fail";
+            if (!cluster.verifyConnection()) {
+                result.setMsg("the remote cluster connection failed, please check!");
+                result.setStatus(3);
+                return result;
+            }
+        } else if (ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())) {
+            if (cluster.getId() == null && !StringUtils.isAllBlank(cluster.getAddress(), cluster.getClusterId())) {
+                if (!cluster.verifyFlinkYarnCluster()) {
+                    result.setMsg("the flink cluster connection failed, please check!");
+                    result.setStatus(4);
+                    return result;
+                }
+            }
         }
-        return "success";
+        return result;
     }
 
     @Override
     public ResponseResult create(FlinkCluster flinkCluster) {
         ResponseResult result = new ResponseResult();
-        if (StringUtils.isBlank(flinkCluster.getClusterName())) {
-            result.setMsg("clusterName can't empty!");
-            result.setStatus(0);
-            return result;
-        }
-        String clusterId = flinkCluster.getClusterId();
-        if (StringUtils.isNoneBlank(clusterId)) {
-            Boolean existsByClusterId = this.existsByClusterId(clusterId);
-            if (existsByClusterId) {
-                result.setMsg("the clusterId" + clusterId + "is already exists,please check!");
-                result.setStatus(0);
-                return result;
-            }
-        }
         flinkCluster.setUserId(commonService.getUserId());
         flinkCluster.setCreateTime(new Date());
-        // remote mode directly set STARTED
-        if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+        if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
+            if (StringUtils.isAllBlank(flinkCluster.getAddress(), flinkCluster.getClusterId())) {
+                flinkCluster.setClusterState(ClusterState.CREATED.getValue());
+            } else {
+                flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+            }
+        } else if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
             flinkCluster.setClusterState(ClusterState.STARTED.getValue());
         } else {
             flinkCluster.setClusterState(ClusterState.CREATED.getValue());
@@ -167,7 +179,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
                 flinkEnv.getFlinkVersion(),
                 flinkCluster.getClusterId(),
                 executionModeEnum,
-                flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) : null,
                 flinkCluster.getProperties(),
                 kubernetesDeployParam
             );
@@ -176,8 +187,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
             DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
             if (null != deployResponse) {
                 if (deployResponse.message() == null) {
-                    updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
                     updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
+                    updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
                     updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
                     updateWrapper.set(FlinkCluster::getException, null);
                     update(updateWrapper);
@@ -198,8 +209,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
             updateWrapper.set(FlinkCluster::getException, e.toString());
             update(updateWrapper);
             result.setStatus(0);
-            result.setMsg("deploy cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-            return result;
+            throw new ApiDetailException(e);
         }
     }
 
@@ -277,8 +287,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     }
 
     @Override
-    public Boolean existsByClusterId(String clusterId) {
-        return this.baseMapper.existsByClusterId(clusterId);
+    public Boolean existsByClusterId(String clusterId, Long id) {
+        return this.baseMapper.existsByClusterId(clusterId, id);
     }
 
     @Override
@@ -307,15 +317,4 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
         return result;
     }
 
-    private Map<String, Serializable> getFlameGraph(FlinkCluster flinkCluster) {
-        Map<String, Serializable> flameGraph = new HashMap<>(8);
-        flameGraph.put("reporter", "org.apache.streampark.plugin.profiling.reporter.HttpReporter");
-        flameGraph.put("type", ApplicationType.STREAMPARK_FLINK.getType());
-        flameGraph.put("id", flinkCluster.getId());
-        flameGraph.put("url", settingService.getStreamParkAddress().concat("/metrics/report"));
-        flameGraph.put("token", Utils.uuid());
-        flameGraph.put("sampleInterval", 1000 * 60 * 2);
-        flameGraph.put("metricInterval", 1000 * 60 * 2);
-        return flameGraph;
-    }
 }
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 1e1a85c9d..e22a44869 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -47,7 +47,12 @@
     <select id="existsByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.String">
         select count(1)
         from t_flink_cluster
-        where cluster_id=#{clusterId}
+        <where>
+            cluster_id=#{clusterId}
+            <if test="id != null">
+                and id &lt;&gt; #{id}
+            </if>
+        </where>
         limit 1
     </select>
 
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 21d5d1944..4f1b42492 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -291,7 +291,3 @@ export const filterOption = (input: string, options: Recordable) => {
 export function isK8sExecMode(mode: number): boolean {
   return [ExecModeEnum.KUBERNETES_SESSION, ExecModeEnum.KUBERNETES_APPLICATION].includes(mode);
 }
-// session mode
-export function isSessionMode(mode: number): boolean {
-  return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode);
-}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
index 0ee7c4afb..f67f0cbe7 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
@@ -50,7 +50,8 @@
       const params = handleSubmitParams(values);
       if (Object.keys(params).length > 0) {
         const res = await fetchCheckCluster(params);
-        if (res === 'success') {
+        const status = parseInt(res.status);
+        if (status === 0) {
           const resp = await fetchCreateCluster(params);
           if (resp.status) {
             Swal.fire({
@@ -63,18 +64,8 @@
           } else {
             Swal.fire(resp.msg);
           }
-        } else if (res === 'exists') {
-          Swal.fire(
-            'Failed',
-            'the cluster name: ' + values.clusterName + ' is already exists,please check',
-            'error',
-          );
         } else {
-          Swal.fire(
-            'Failed',
-            'the address is invalid or connection failure, please check',
-            'error',
-          );
+          Swal.fire('Failed', res.msg, 'error');
         }
       }
     } catch (error) {
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
index 0b58d3842..32836455e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
@@ -65,7 +65,8 @@
           id: cluster.id,
         });
         const res = await fetchCheckCluster(params);
-        if (res === 'success') {
+        const status = parseInt(res.status);
+        if (status === 0) {
           const resp = await fetchUpdateCluster(params);
           if (resp.status) {
             Swal.fire({
@@ -78,18 +79,8 @@
           } else {
             Swal.fire(resp.data.msg);
           }
-        } else if (res === 'exists') {
-          Swal.fire(
-            'Failed',
-            'the cluster name: ' + values.clusterName + ' is already exists,please check',
-            'error',
-          );
         } else {
-          Swal.fire(
-            'Failed',
-            'the address is invalid or connection failure, please check',
-            'error',
-          );
+          Swal.fire('Failed', res.msg, 'error');
         }
       }
     } catch (error) {
@@ -122,7 +113,6 @@
         flinkImage: cluster.flinkImage,
         serviceAccount: cluster.serviceAccount,
         k8sConf: cluster.k8sConf,
-        flameGraph: cluster.flameGraph,
         k8sNamespace: cluster.k8sNamespace,
         ...resetParams,
       });
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
index 77de7dca3..74698ad46 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
@@ -34,7 +34,7 @@ import {
   fetchK8sNamespaces,
   fetchSessionClusterIds,
 } from '/@/api/flink/app/flinkHistory';
-import { handleFormValue, isSessionMode } from '../../app/utils';
+import { handleFormValue } from '../../app/utils';
 import { useMessage } from '/@/hooks/web/useMessage';
 import { ClusterAddTypeEnum } from '/@/enums/appEnum';
 import { useI18n } from '/@/hooks/web/useI18n';
@@ -89,6 +89,22 @@ export const useClusterSetting = () => {
       }
     }
   }
+
+  function isAddExistYarnSession(value: Recordable) {
+    return (
+      value.executionMode == ExecModeEnum.YARN_SESSION &&
+      value.addType == ClusterAddTypeEnum.ADD_EXISTING
+    );
+  }
+
+  // session mode
+  function isShowInSessionMode(value: Recordable): boolean {
+    if (value.executionMode == ExecModeEnum.YARN_SESSION) {
+      return value.addType == ClusterAddTypeEnum.ADD_NEW;
+    }
+    return value.executionMode == ExecModeEnum.KUBERNETES_SESSION;
+  }
+
   const getClusterSchema = computed((): FormSchema[] => {
     return [
       {
@@ -127,18 +143,10 @@ export const useClusterSetting = () => {
         },
         rules: [{ required: true, message: 'Flink Version is required' }],
       },
-      {
-        field: 'yarnQueue',
-        label: 'Yarn Queue',
-        component: 'Input',
-        componentProps: {
-          placeholder: 'Please enter yarn queue',
-        },
-        ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
-      },
       {
         field: 'addType',
         label: t('flink.setting.cluster.form.addType'),
+        ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
         component: 'Select',
         defaultValue: ClusterAddTypeEnum.ADD_EXISTING,
         componentProps: {
@@ -156,6 +164,17 @@ export const useClusterSetting = () => {
           ],
         },
       },
+      {
+        field: 'yarnQueue',
+        label: 'Yarn Queue',
+        component: 'Input',
+        componentProps: {
+          placeholder: 'Please enter yarn queue',
+        },
+        ifShow: ({ values }) =>
+          values.executionMode == ExecModeEnum.YARN_SESSION &&
+          values.addType == ClusterAddTypeEnum.ADD_NEW,
+      },
       {
         field: 'address',
         label: 'Address',
@@ -168,7 +187,8 @@ export const useClusterSetting = () => {
                 : 'Please enter cluster address,  e.g: http://host:port',
           };
         },
-        ifShow: ({ values }) => values.addType == ClusterAddTypeEnum.ADD_EXISTING,
+        ifShow: ({ values }) =>
+          isAddExistYarnSession(values) || values.executionMode == ExecModeEnum.REMOTE,
         rules: [{ required: true, message: t('flink.setting.cluster.required.address') }],
       },
       {
@@ -178,9 +198,7 @@ export const useClusterSetting = () => {
         componentProps: {
           placeholder: 'Please enter Yarn Session clusterId',
         },
-        ifShow: ({ values }) =>
-          values.addType == ClusterAddTypeEnum.ADD_EXISTING &&
-          values.executionMode == ExecModeEnum.YARN_SESSION,
+        ifShow: ({ values }) => isAddExistYarnSession(values),
         rules: [{ required: true, message: t('flink.setting.cluster.required.clusterId') }],
       },
       {
@@ -254,7 +272,7 @@ export const useClusterSetting = () => {
       {
         field: 'resolveOrder',
         label: 'Resolve Order',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: { placeholder: 'classloader.resolve-order', options: resolveOrder },
         rules: [{ required: true, message: 'Resolve Order is required', type: 'number' }],
@@ -262,7 +280,7 @@ export const useClusterSetting = () => {
       {
         field: 'slot',
         label: 'Task Slots',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'InputNumber',
         componentProps: {
           placeholder: 'Number of slots per TaskManager',
@@ -274,14 +292,14 @@ export const useClusterSetting = () => {
       {
         field: 'totalOptions',
         label: 'Total Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         render: (renderCallbackParams) => renderTotalMemory(renderCallbackParams),
       },
       {
         field: 'totalItem',
         label: 'totalItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'totalOptions', field, '.memory', true),
@@ -289,7 +307,7 @@ export const useClusterSetting = () => {
       {
         field: 'jmOptions',
         label: 'JM Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: {
           showSearch: true,
@@ -304,7 +322,7 @@ export const useClusterSetting = () => {
       {
         field: 'jmOptionsItem',
         label: 'jmOptionsItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'jmOptions', field, 'jobmanager.memory.'),
@@ -312,7 +330,7 @@ export const useClusterSetting = () => {
       {
         field: 'tmOptions',
         label: 'TM Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: {
           showSearch: true,
@@ -327,7 +345,7 @@ export const useClusterSetting = () => {
       {
         field: 'tmOptionsItem',
         label: 'tmOptionsItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'tmOptions', field, 'taskmanager.memory.'),
@@ -335,7 +353,7 @@ export const useClusterSetting = () => {
       {
         field: 'dynamicProperties',
         label: 'Dynamic Properties',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Input',
         render: (renderCallbackParams) => renderDynamicProperties(renderCallbackParams),
       },
@@ -366,14 +384,18 @@ export const useClusterSetting = () => {
         });
         return params;
       case ExecModeEnum.YARN_SESSION:
-        Object.assign(params, {
-          options: JSON.stringify(options),
-          yarnQueue: values.yarnQueue || 'default',
-          dynamicProperties: values.dynamicProperties,
-          resolveOrder: values.resolveOrder,
-          address: values.address,
-          flameGraph: values.flameGraph,
-        });
+        if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) {
+          Object.assign(params, {
+            address: values.address,
+          });
+        } else {
+          Object.assign(params, {
+            options: JSON.stringify(options),
+            yarnQueue: values.yarnQueue || 'default',
+            dynamicProperties: values.dynamicProperties,
+            resolveOrder: values.resolveOrder,
+          });
+        }
         return params;
       case ExecModeEnum.KUBERNETES_SESSION:
         Object.assign(params, {
@@ -386,7 +408,6 @@ export const useClusterSetting = () => {
           k8sConf: values.k8sConf,
           flinkImage: values.flinkImage || null,
           address: values.address,
-          flameGraph: values.flameGraph,
         });
         return params;
       default:
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
index 278d46758..264653d49 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 case class DeployRequest(flinkVersion: FlinkVersion,
                          clusterId: String,
                          executionMode: ExecutionMode,
-                         flameGraph: JavaMap[String, java.io.Serializable],
                          properties: JavaMap[String, Any],
                          @Nullable k8sDeployParam: KubernetesDeployParam) {
 
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
index 9e91914ff..9872d082a 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
@@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
          |    exposedType      : ${deployRequest.k8sDeployParam.flinkRestExposedType}
          |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
          |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
-         |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          |-------------------------------------------------------------------------------------------
          |""".stripMargin)
@@ -158,7 +157,7 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
         client = clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
       if (client.getWebInterfaceURL != null) {
-        DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+        DeployResponse(client.getWebInterfaceURL, client.getClusterId)
       } else {
         null
       }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 21b216e77..128771131 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -172,7 +172,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
          |    clusterId        : ${deployRequest.clusterId}
-         |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          |-------------------------------------------------------------------------------------------
          |""".stripMargin)