You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/25 18:04:35 UTC

[incubator-streampark] branch session-cluster created (now 452929131)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a change to branch session-cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


      at 452929131 [Improve] drop column flame_graph in t_flink_cluster

This branch includes the following new commits:

     new d2f67fd60 [Bug] flink yarn session cluster bug fixed
     new 452929131 [Improve] drop column flame_graph in t_flink_cluster

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampark] 01/02: [Bug] flink yarn session cluster bug fixed

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch session-cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit d2f67fd60afe82ff65d5cea3ee2e93d0dd95dbc2
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 26 02:02:31 2022 +0800

    [Bug] flink yarn session cluster bug fixed
---
 .../core/controller/FlinkClusterController.java    |  3 +-
 .../console/core/entity/Application.java           |  9 ++-
 .../console/core/entity/FlinkCluster.java          | 46 +++++++++--
 .../console/core/mapper/FlinkClusterMapper.java    |  2 +-
 .../console/core/service/FlinkClusterService.java  |  4 +-
 .../core/service/impl/FlinkClusterServiceImpl.java | 93 +++++++++++-----------
 .../resources/mapper/core/FlinkClusterMapper.xml   |  7 +-
 .../src/views/flink/app/utils/index.ts             |  4 -
 .../src/views/flink/setting/AddCluster.vue         | 15 +---
 .../src/views/flink/setting/EditCluster.vue        | 16 +---
 .../views/flink/setting/hooks/useClusterSetting.ts | 85 ++++++++++++--------
 .../flink/submit/bean/DeployRequest.scala          |  1 -
 .../impl/KubernetesNativeSessionSubmit.scala       |  3 +-
 .../flink/submit/impl/YarnSessionSubmit.scala      |  1 -
 14 files changed, 160 insertions(+), 129 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 5927012e5..2482ad13c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -57,7 +57,7 @@ public class FlinkClusterController {
 
     @PostMapping("check")
     public RestResponse check(FlinkCluster cluster) {
-        String checkResult = flinkClusterService.check(cluster);
+        ResponseResult checkResult = flinkClusterService.check(cluster);
         return RestResponse.success(checkResult);
     }
 
@@ -78,7 +78,6 @@ public class FlinkClusterController {
         flinkCluster.setAddress(cluster.getAddress());
         flinkCluster.setExecutionMode(cluster.getExecutionMode());
         flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
-        flinkCluster.setFlameGraph(cluster.getFlameGraph());
         flinkCluster.setFlinkImage(cluster.getFlinkImage());
         flinkCluster.setOptions(cluster.getOptions());
         flinkCluster.setYarnQueue(cluster.getYarnQueue());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 8be656d51..7765869bd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -482,9 +482,12 @@ public class Application implements Serializable {
     @SneakyThrows
     @SuppressWarnings("unchecked")
     public Map<String, Object> getOptionMap() {
-        Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
-        map.entrySet().removeIf(entry -> entry.getValue() == null);
-        return map;
+        if (StringUtils.isNotEmpty(this.options)) {
+            Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
+            map.entrySet().removeIf(entry -> entry.getValue() == null);
+            return map;
+        }
+        return Collections.emptyMap();
     }
 
     @JsonIgnore
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 9ee264226..965bfbda4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.flink.submit.FlinkSubmitter;
 
 import com.baomidou.mybatisplus.annotation.IdType;
@@ -35,12 +36,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.Data;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.http.client.config.RequestConfig;
 
 import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -86,8 +89,6 @@ public class FlinkCluster implements Serializable {
 
     private Integer k8sRestExposedType;
 
-    private Boolean flameGraph;
-
     private String k8sConf;
 
     private Integer resolveOrder;
@@ -115,13 +116,18 @@ public class FlinkCluster implements Serializable {
     @JsonIgnore
     @SneakyThrows
     public Map<String, Object> getOptionMap() {
-        Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
-        if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
-            map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
-            map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+        if (StringUtils.isNotEmpty(this.options)) {
+            Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
+            if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
+                map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
+                if (StringUtils.isNotEmpty(this.yarnQueue)) {
+                    map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+                }
+            }
+            map.entrySet().removeIf(entry -> entry.getValue() == null);
+            return map;
         }
-        map.entrySet().removeIf(entry -> entry.getValue() == null);
-        return map;
+        return Collections.emptyMap();
     }
 
     @JsonIgnore
@@ -159,6 +165,30 @@ public class FlinkCluster implements Serializable {
         return false;
     }
 
+    public boolean verifyFlinkYarnCluster() {
+        if (address == null) {
+            return false;
+        }
+        String[] array = address.split(",");
+        for (String url : array) {
+            try {
+                new URI(url);
+            } catch (Exception ignored) {
+                return false;
+            }
+            try {
+                String format = "%s/proxy/%s/overview";
+                String yarnSessionUrl = String.format(format, url, this.clusterId);
+                String result = HttpClientUtils.httpGetRequest(yarnSessionUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                JacksonUtils.read(result, Overview.class);
+                return true;
+            } catch (Exception ignored) {
+                //
+            }
+        }
+        return false;
+    }
+
     @JsonIgnore
     public Map<String, String> getFlinkConfig() throws MalformedURLException, JsonProcessingException {
         URI activeAddress = this.getActiveAddress();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
index 428aa8e99..d9e9b6558 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
@@ -24,7 +24,7 @@ import org.apache.ibatis.annotations.Param;
 
 public interface FlinkClusterMapper extends BaseMapper<FlinkCluster> {
 
-    Boolean existsByClusterId(@Param("clusterId") String clusterId);
+    Boolean existsByClusterId(@Param("clusterId") String clusterId, @Param("id") Long id);
 
     Boolean existsByClusterName(@Param("clusterName") String clusterName, @Param("id") Long id);
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index a2c525441..2be56cb3e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -24,7 +24,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
 
 public interface FlinkClusterService extends IService<FlinkCluster> {
 
-    String check(FlinkCluster flinkCluster);
+    ResponseResult check(FlinkCluster flinkCluster);
 
     ResponseResult create(FlinkCluster flinkCluster);
 
@@ -36,7 +36,7 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
 
     ResponseResult shutdown(FlinkCluster flinkCluster);
 
-    Boolean existsByClusterId(String clusterId);
+    Boolean existsByClusterId(String clusterId, Long id);
 
     Boolean existsByClusterName(String clusterName, Long id);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index da14fa90a..8217aee40 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -17,11 +17,10 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -49,10 +48,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.io.Serializable;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -84,44 +80,60 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     private SettingService settingService;
 
     @Override
-    public String check(FlinkCluster cluster) {
-        if (null == cluster.getClusterName() || null == cluster.getExecutionMode()) {
-            return "error";
-        }
-        //1) Check if name is duplicate, if it already exists
+    public ResponseResult check(FlinkCluster cluster) {
+        ResponseResult result = new ResponseResult();
+        result.setStatus(0);
+
+        //1) Check name is already exists
         Boolean existsByClusterName = this.existsByClusterName(cluster.getClusterName(), cluster.getId());
         if (existsByClusterName) {
-            return "exists";
+            result.setMsg("clusterName is already exists,please check!");
+            result.setStatus(1);
+            return result;
+        }
+
+        //2) Check target-cluster is already exists
+        String clusterId = cluster.getClusterId();
+        if (StringUtils.isNotEmpty(clusterId)) {
+            Boolean existsByClusterId = this.existsByClusterId(clusterId, cluster.getId());
+            if (existsByClusterId) {
+                result.setMsg("the clusterId " + clusterId + " is already exists,please check!");
+                result.setStatus(2);
+                return result;
+            }
         }
 
+        // 3) Check connection
         if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
-            //2) Check if the connection can be made to
-            return cluster.verifyConnection() ? "success" : "fail";
+            if (!cluster.verifyConnection()) {
+                result.setMsg("the remote cluster connection failed, please check!");
+                result.setStatus(3);
+                return result;
+            }
+        } else if (ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())) {
+            if (cluster.getId() == null && !StringUtils.isAllBlank(cluster.getAddress(), cluster.getClusterId())) {
+                if (!cluster.verifyFlinkYarnCluster()) {
+                    result.setMsg("the flink cluster connection failed, please check!");
+                    result.setStatus(4);
+                    return result;
+                }
+            }
         }
-        return "success";
+        return result;
     }
 
     @Override
     public ResponseResult create(FlinkCluster flinkCluster) {
         ResponseResult result = new ResponseResult();
-        if (StringUtils.isBlank(flinkCluster.getClusterName())) {
-            result.setMsg("clusterName can't empty!");
-            result.setStatus(0);
-            return result;
-        }
-        String clusterId = flinkCluster.getClusterId();
-        if (StringUtils.isNoneBlank(clusterId)) {
-            Boolean existsByClusterId = this.existsByClusterId(clusterId);
-            if (existsByClusterId) {
-                result.setMsg("the clusterId" + clusterId + "is already exists,please check!");
-                result.setStatus(0);
-                return result;
-            }
-        }
         flinkCluster.setUserId(commonService.getUserId());
         flinkCluster.setCreateTime(new Date());
-        // remote mode directly set STARTED
-        if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+        if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
+            if (StringUtils.isAllBlank(flinkCluster.getAddress(), flinkCluster.getClusterId())) {
+                flinkCluster.setClusterState(ClusterState.CREATED.getValue());
+            } else {
+                flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+            }
+        } else if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
             flinkCluster.setClusterState(ClusterState.STARTED.getValue());
         } else {
             flinkCluster.setClusterState(ClusterState.CREATED.getValue());
@@ -167,7 +179,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
                 flinkEnv.getFlinkVersion(),
                 flinkCluster.getClusterId(),
                 executionModeEnum,
-                flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) : null,
                 flinkCluster.getProperties(),
                 kubernetesDeployParam
             );
@@ -176,8 +187,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
             DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
             if (null != deployResponse) {
                 if (deployResponse.message() == null) {
-                    updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
                     updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
+                    updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
                     updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
                     updateWrapper.set(FlinkCluster::getException, null);
                     update(updateWrapper);
@@ -198,8 +209,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
             updateWrapper.set(FlinkCluster::getException, e.toString());
             update(updateWrapper);
             result.setStatus(0);
-            result.setMsg("deploy cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-            return result;
+            throw new ApiDetailException(e);
         }
     }
 
@@ -277,8 +287,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     }
 
     @Override
-    public Boolean existsByClusterId(String clusterId) {
-        return this.baseMapper.existsByClusterId(clusterId);
+    public Boolean existsByClusterId(String clusterId, Long id) {
+        return this.baseMapper.existsByClusterId(clusterId, id);
     }
 
     @Override
@@ -307,15 +317,4 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
         return result;
     }
 
-    private Map<String, Serializable> getFlameGraph(FlinkCluster flinkCluster) {
-        Map<String, Serializable> flameGraph = new HashMap<>(8);
-        flameGraph.put("reporter", "org.apache.streampark.plugin.profiling.reporter.HttpReporter");
-        flameGraph.put("type", ApplicationType.STREAMPARK_FLINK.getType());
-        flameGraph.put("id", flinkCluster.getId());
-        flameGraph.put("url", settingService.getStreamParkAddress().concat("/metrics/report"));
-        flameGraph.put("token", Utils.uuid());
-        flameGraph.put("sampleInterval", 1000 * 60 * 2);
-        flameGraph.put("metricInterval", 1000 * 60 * 2);
-        return flameGraph;
-    }
 }
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 1e1a85c9d..e22a44869 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -47,7 +47,12 @@
     <select id="existsByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.String">
         select count(1)
         from t_flink_cluster
-        where cluster_id=#{clusterId}
+        <where>
+            cluster_id=#{clusterId}
+            <if test="id != null">
+                and id &lt;&gt; #{id}
+            </if>
+        </where>
         limit 1
     </select>
 
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 21d5d1944..4f1b42492 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -291,7 +291,3 @@ export const filterOption = (input: string, options: Recordable) => {
 export function isK8sExecMode(mode: number): boolean {
   return [ExecModeEnum.KUBERNETES_SESSION, ExecModeEnum.KUBERNETES_APPLICATION].includes(mode);
 }
-// session mode
-export function isSessionMode(mode: number): boolean {
-  return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode);
-}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
index 0ee7c4afb..f67f0cbe7 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
@@ -50,7 +50,8 @@
       const params = handleSubmitParams(values);
       if (Object.keys(params).length > 0) {
         const res = await fetchCheckCluster(params);
-        if (res === 'success') {
+        const status = parseInt(res.status);
+        if (status === 0) {
           const resp = await fetchCreateCluster(params);
           if (resp.status) {
             Swal.fire({
@@ -63,18 +64,8 @@
           } else {
             Swal.fire(resp.msg);
           }
-        } else if (res === 'exists') {
-          Swal.fire(
-            'Failed',
-            'the cluster name: ' + values.clusterName + ' is already exists,please check',
-            'error',
-          );
         } else {
-          Swal.fire(
-            'Failed',
-            'the address is invalid or connection failure, please check',
-            'error',
-          );
+          Swal.fire('Failed', res.msg, 'error');
         }
       }
     } catch (error) {
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
index 0b58d3842..32836455e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
@@ -65,7 +65,8 @@
           id: cluster.id,
         });
         const res = await fetchCheckCluster(params);
-        if (res === 'success') {
+        const status = parseInt(res.status);
+        if (status === 0) {
           const resp = await fetchUpdateCluster(params);
           if (resp.status) {
             Swal.fire({
@@ -78,18 +79,8 @@
           } else {
             Swal.fire(resp.data.msg);
           }
-        } else if (res === 'exists') {
-          Swal.fire(
-            'Failed',
-            'the cluster name: ' + values.clusterName + ' is already exists,please check',
-            'error',
-          );
         } else {
-          Swal.fire(
-            'Failed',
-            'the address is invalid or connection failure, please check',
-            'error',
-          );
+          Swal.fire('Failed', res.msg, 'error');
         }
       }
     } catch (error) {
@@ -122,7 +113,6 @@
         flinkImage: cluster.flinkImage,
         serviceAccount: cluster.serviceAccount,
         k8sConf: cluster.k8sConf,
-        flameGraph: cluster.flameGraph,
         k8sNamespace: cluster.k8sNamespace,
         ...resetParams,
       });
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
index 77de7dca3..74698ad46 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
@@ -34,7 +34,7 @@ import {
   fetchK8sNamespaces,
   fetchSessionClusterIds,
 } from '/@/api/flink/app/flinkHistory';
-import { handleFormValue, isSessionMode } from '../../app/utils';
+import { handleFormValue } from '../../app/utils';
 import { useMessage } from '/@/hooks/web/useMessage';
 import { ClusterAddTypeEnum } from '/@/enums/appEnum';
 import { useI18n } from '/@/hooks/web/useI18n';
@@ -89,6 +89,22 @@ export const useClusterSetting = () => {
       }
     }
   }
+
+  function isAddExistYarnSession(value: Recordable) {
+    return (
+      value.executionMode == ExecModeEnum.YARN_SESSION &&
+      value.addType == ClusterAddTypeEnum.ADD_EXISTING
+    );
+  }
+
+  // session mode
+  function isShowInSessionMode(value: Recordable): boolean {
+    if (value.executionMode == ExecModeEnum.YARN_SESSION) {
+      return value.addType == ClusterAddTypeEnum.ADD_NEW;
+    }
+    return value.executionMode == ExecModeEnum.KUBERNETES_SESSION;
+  }
+
   const getClusterSchema = computed((): FormSchema[] => {
     return [
       {
@@ -127,18 +143,10 @@ export const useClusterSetting = () => {
         },
         rules: [{ required: true, message: 'Flink Version is required' }],
       },
-      {
-        field: 'yarnQueue',
-        label: 'Yarn Queue',
-        component: 'Input',
-        componentProps: {
-          placeholder: 'Please enter yarn queue',
-        },
-        ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
-      },
       {
         field: 'addType',
         label: t('flink.setting.cluster.form.addType'),
+        ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
         component: 'Select',
         defaultValue: ClusterAddTypeEnum.ADD_EXISTING,
         componentProps: {
@@ -156,6 +164,17 @@ export const useClusterSetting = () => {
           ],
         },
       },
+      {
+        field: 'yarnQueue',
+        label: 'Yarn Queue',
+        component: 'Input',
+        componentProps: {
+          placeholder: 'Please enter yarn queue',
+        },
+        ifShow: ({ values }) =>
+          values.executionMode == ExecModeEnum.YARN_SESSION &&
+          values.addType == ClusterAddTypeEnum.ADD_NEW,
+      },
       {
         field: 'address',
         label: 'Address',
@@ -168,7 +187,8 @@ export const useClusterSetting = () => {
                 : 'Please enter cluster address,  e.g: http://host:port',
           };
         },
-        ifShow: ({ values }) => values.addType == ClusterAddTypeEnum.ADD_EXISTING,
+        ifShow: ({ values }) =>
+          isAddExistYarnSession(values) || values.executionMode == ExecModeEnum.REMOTE,
         rules: [{ required: true, message: t('flink.setting.cluster.required.address') }],
       },
       {
@@ -178,9 +198,7 @@ export const useClusterSetting = () => {
         componentProps: {
           placeholder: 'Please enter Yarn Session clusterId',
         },
-        ifShow: ({ values }) =>
-          values.addType == ClusterAddTypeEnum.ADD_EXISTING &&
-          values.executionMode == ExecModeEnum.YARN_SESSION,
+        ifShow: ({ values }) => isAddExistYarnSession(values),
         rules: [{ required: true, message: t('flink.setting.cluster.required.clusterId') }],
       },
       {
@@ -254,7 +272,7 @@ export const useClusterSetting = () => {
       {
         field: 'resolveOrder',
         label: 'Resolve Order',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: { placeholder: 'classloader.resolve-order', options: resolveOrder },
         rules: [{ required: true, message: 'Resolve Order is required', type: 'number' }],
@@ -262,7 +280,7 @@ export const useClusterSetting = () => {
       {
         field: 'slot',
         label: 'Task Slots',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'InputNumber',
         componentProps: {
           placeholder: 'Number of slots per TaskManager',
@@ -274,14 +292,14 @@ export const useClusterSetting = () => {
       {
         field: 'totalOptions',
         label: 'Total Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         render: (renderCallbackParams) => renderTotalMemory(renderCallbackParams),
       },
       {
         field: 'totalItem',
         label: 'totalItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'totalOptions', field, '.memory', true),
@@ -289,7 +307,7 @@ export const useClusterSetting = () => {
       {
         field: 'jmOptions',
         label: 'JM Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: {
           showSearch: true,
@@ -304,7 +322,7 @@ export const useClusterSetting = () => {
       {
         field: 'jmOptionsItem',
         label: 'jmOptionsItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'jmOptions', field, 'jobmanager.memory.'),
@@ -312,7 +330,7 @@ export const useClusterSetting = () => {
       {
         field: 'tmOptions',
         label: 'TM Memory Options',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         componentProps: {
           showSearch: true,
@@ -327,7 +345,7 @@ export const useClusterSetting = () => {
       {
         field: 'tmOptionsItem',
         label: 'tmOptionsItem',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Select',
         renderColContent: ({ model, field }) =>
           renderOptionsItems(model, 'tmOptions', field, 'taskmanager.memory.'),
@@ -335,7 +353,7 @@ export const useClusterSetting = () => {
       {
         field: 'dynamicProperties',
         label: 'Dynamic Properties',
-        ifShow: ({ values }) => isSessionMode(values.executionMode),
+        ifShow: ({ values }) => isShowInSessionMode(values),
         component: 'Input',
         render: (renderCallbackParams) => renderDynamicProperties(renderCallbackParams),
       },
@@ -366,14 +384,18 @@ export const useClusterSetting = () => {
         });
         return params;
       case ExecModeEnum.YARN_SESSION:
-        Object.assign(params, {
-          options: JSON.stringify(options),
-          yarnQueue: values.yarnQueue || 'default',
-          dynamicProperties: values.dynamicProperties,
-          resolveOrder: values.resolveOrder,
-          address: values.address,
-          flameGraph: values.flameGraph,
-        });
+        if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) {
+          Object.assign(params, {
+            address: values.address,
+          });
+        } else {
+          Object.assign(params, {
+            options: JSON.stringify(options),
+            yarnQueue: values.yarnQueue || 'default',
+            dynamicProperties: values.dynamicProperties,
+            resolveOrder: values.resolveOrder,
+          });
+        }
         return params;
       case ExecModeEnum.KUBERNETES_SESSION:
         Object.assign(params, {
@@ -386,7 +408,6 @@ export const useClusterSetting = () => {
           k8sConf: values.k8sConf,
           flinkImage: values.flinkImage || null,
           address: values.address,
-          flameGraph: values.flameGraph,
         });
         return params;
       default:
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
index 278d46758..264653d49 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 case class DeployRequest(flinkVersion: FlinkVersion,
                          clusterId: String,
                          executionMode: ExecutionMode,
-                         flameGraph: JavaMap[String, java.io.Serializable],
                          properties: JavaMap[String, Any],
                          @Nullable k8sDeployParam: KubernetesDeployParam) {
 
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
index 9e91914ff..9872d082a 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
@@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
          |    exposedType      : ${deployRequest.k8sDeployParam.flinkRestExposedType}
          |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
          |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
-         |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          |-------------------------------------------------------------------------------------------
          |""".stripMargin)
@@ -158,7 +157,7 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
         client = clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
       if (client.getWebInterfaceURL != null) {
-        DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+        DeployResponse(client.getWebInterfaceURL, client.getClusterId)
       } else {
         null
       }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 21b216e77..128771131 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -172,7 +172,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
          |    clusterId        : ${deployRequest.clusterId}
-         |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          |-------------------------------------------------------------------------------------------
          |""".stripMargin)


[incubator-streampark] 02/02: [Improve] drop column flame_graph in t_flink_cluster

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch session-cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 452929131255efb5bbea3a41bba571f42813df0c
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 26 02:04:16 2022 +0800

    [Improve] drop column flame_graph in t_flink_cluster
---
 .../src/assembly/script/schema/mysql-schema.sql                       | 1 -
 .../src/assembly/script/schema/pgsql-schema.sql                       | 1 -
 .../src/assembly/script/upgrade/mysql/2.0.0.sql                       | 4 +++-
 .../streampark-console-service/src/main/resources/db/schema-h2.sql    | 1 -
 .../src/main/resources/mapper/core/FlinkClusterMapper.xml             | 1 -
 5 files changed, 3 insertions(+), 5 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
index dbc244896..dda27ac56 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
@@ -461,7 +461,6 @@ create table `t_flink_cluster` (
   `dynamic_properties` text comment 'allows specifying multiple generic configuration options',
   `k8s_rest_exposed_type` tinyint default 2 comment 'k8s export(0:loadbalancer,1:clusterip,2:nodeport)',
   `k8s_hadoop_integration` tinyint default 0,
-  `flame_graph` tinyint default 0 comment 'flameGraph enableļ¼Œdefault disable',
   `k8s_conf` varchar(255) default null comment 'the path where the k8s configuration file is located',
   `resolve_order` int default null,
   `exception` text comment 'exception information',
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
index 38ec0c479..ecefb7de6 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
@@ -311,7 +311,6 @@ create table "public"."t_flink_cluster" (
   "dynamic_properties" text collate "pg_catalog"."default",
   "k8s_rest_exposed_type" int2 default 2,
   "k8s_hadoop_integration" boolean default false,
-  "flame_graph" boolean default false,
   "k8s_conf" varchar(255) collate "pg_catalog"."default",
   "resolve_order" int4,
   "exception" text collate "pg_catalog"."default",
diff --git a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
index 888325aee..872dee223 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
@@ -79,7 +79,9 @@ alter table `t_flink_project`
     add column `modify_time` datetime not null default current_timestamp on update current_timestamp after `create_time`,
     add index `inx_team` (`team_id`) using btree;
 
-alter table `t_flink_cluster` add column `dynamic_properties` text comment 'allows specifying multiple generic configuration options' after `flink_image`;
+alter table `t_flink_cluster`
+    drop column `flame_graph`,
+    add column `dynamic_properties` text comment 'allows specifying multiple generic configuration options' after `flink_image`;
 
 -- change `update_time` to `modify_time`
 alter table `t_app_build_pipe` change column `update_time` `modify_time` datetime not null default current_timestamp on update current_timestamp;
diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 2d54fe758..5cf0f13f8 100644
--- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -409,7 +409,6 @@ create table if not exists `t_flink_cluster` (
   `dynamic_properties` text comment 'allows specifying multiple generic configuration options',
   `k8s_rest_exposed_type` tinyint default 2 comment 'k8s export(0:loadbalancer,1:clusterip,2:nodeport)',
   `k8s_hadoop_integration` tinyint default 0,
-  `flame_graph` tinyint default 0 comment 'flameGraph enableļ¼Œdefault disable',
   `k8s_conf` varchar(255) default null comment 'the path where the k 8 s configuration file is located',
   `resolve_order` tinyint default null,
   `exception` text comment 'exception information',
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index e22a44869..c459c4b72 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -36,7 +36,6 @@
         <result column="dynamic_properties" jdbcType="LONGVARCHAR" property="dynamicProperties"/>
         <result column="k8s_rest_exposed_type" jdbcType="TINYINT" property="k8sRestExposedType"/>
         <result column="k8s_hadoop_integration" jdbcType="BOOLEAN" property="k8sHadoopIntegration"/>
-        <result column="flame_graph" jdbcType="BOOLEAN" property="flameGraph"/>
         <result column="k8s_conf" jdbcType="VARCHAR" property="k8sConf"/>
         <result column="resolve_order" jdbcType="INTEGER" property="resolveOrder"/>
         <result column="exception" jdbcType="LONGVARCHAR" property="exception"/>