You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/25 18:04:36 UTC
[incubator-streampark] 01/02: [Bug] flink yarn session cluster bug fixed
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch session-cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit d2f67fd60afe82ff65d5cea3ee2e93d0dd95dbc2
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 26 02:02:31 2022 +0800
[Bug] flink yarn session cluster bug fixed
---
.../core/controller/FlinkClusterController.java | 3 +-
.../console/core/entity/Application.java | 9 ++-
.../console/core/entity/FlinkCluster.java | 46 +++++++++--
.../console/core/mapper/FlinkClusterMapper.java | 2 +-
.../console/core/service/FlinkClusterService.java | 4 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 93 +++++++++++-----------
.../resources/mapper/core/FlinkClusterMapper.xml | 7 +-
.../src/views/flink/app/utils/index.ts | 4 -
.../src/views/flink/setting/AddCluster.vue | 15 +---
.../src/views/flink/setting/EditCluster.vue | 16 +---
.../views/flink/setting/hooks/useClusterSetting.ts | 85 ++++++++++++--------
.../flink/submit/bean/DeployRequest.scala | 1 -
.../impl/KubernetesNativeSessionSubmit.scala | 3 +-
.../flink/submit/impl/YarnSessionSubmit.scala | 1 -
14 files changed, 160 insertions(+), 129 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 5927012e5..2482ad13c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -57,7 +57,7 @@ public class FlinkClusterController {
@PostMapping("check")
public RestResponse check(FlinkCluster cluster) {
- String checkResult = flinkClusterService.check(cluster);
+ ResponseResult checkResult = flinkClusterService.check(cluster);
return RestResponse.success(checkResult);
}
@@ -78,7 +78,6 @@ public class FlinkClusterController {
flinkCluster.setAddress(cluster.getAddress());
flinkCluster.setExecutionMode(cluster.getExecutionMode());
flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
- flinkCluster.setFlameGraph(cluster.getFlameGraph());
flinkCluster.setFlinkImage(cluster.getFlinkImage());
flinkCluster.setOptions(cluster.getOptions());
flinkCluster.setYarnQueue(cluster.getYarnQueue());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 8be656d51..7765869bd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -482,9 +482,12 @@ public class Application implements Serializable {
@SneakyThrows
@SuppressWarnings("unchecked")
public Map<String, Object> getOptionMap() {
- Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
- map.entrySet().removeIf(entry -> entry.getValue() == null);
- return map;
+ if (StringUtils.isNotEmpty(this.options)) {
+ Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
+ map.entrySet().removeIf(entry -> entry.getValue() == null);
+ return map;
+ }
+ return Collections.emptyMap();
}
@JsonIgnore
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 9ee264226..965bfbda4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.flink.submit.FlinkSubmitter;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -35,12 +36,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.Data;
import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.http.client.config.RequestConfig;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URI;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -86,8 +89,6 @@ public class FlinkCluster implements Serializable {
private Integer k8sRestExposedType;
- private Boolean flameGraph;
-
private String k8sConf;
private Integer resolveOrder;
@@ -115,13 +116,18 @@ public class FlinkCluster implements Serializable {
@JsonIgnore
@SneakyThrows
public Map<String, Object> getOptionMap() {
- Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
- if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
- map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
- map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+ if (StringUtils.isNotEmpty(this.options)) {
+ Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
+ if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
+ map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
+ if (StringUtils.isNotEmpty(this.yarnQueue)) {
+ map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+ }
+ }
+ map.entrySet().removeIf(entry -> entry.getValue() == null);
+ return map;
}
- map.entrySet().removeIf(entry -> entry.getValue() == null);
- return map;
+ return Collections.emptyMap();
}
@JsonIgnore
@@ -159,6 +165,30 @@ public class FlinkCluster implements Serializable {
return false;
}
+ public boolean verifyFlinkYarnCluster() {
+ if (address == null) {
+ return false;
+ }
+ String[] array = address.split(",");
+ for (String url : array) {
+ try {
+ new URI(url);
+ } catch (Exception ignored) {
+ return false;
+ }
+ try {
+ String format = "%s/proxy/%s/overview";
+ String yarnSessionUrl = String.format(format, url, this.clusterId);
+ String result = HttpClientUtils.httpGetRequest(yarnSessionUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+ JacksonUtils.read(result, Overview.class);
+ return true;
+ } catch (Exception ignored) {
+ //
+ }
+ }
+ return false;
+ }
+
@JsonIgnore
public Map<String, String> getFlinkConfig() throws MalformedURLException, JsonProcessingException {
URI activeAddress = this.getActiveAddress();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
index 428aa8e99..d9e9b6558 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
@@ -24,7 +24,7 @@ import org.apache.ibatis.annotations.Param;
public interface FlinkClusterMapper extends BaseMapper<FlinkCluster> {
- Boolean existsByClusterId(@Param("clusterId") String clusterId);
+ Boolean existsByClusterId(@Param("clusterId") String clusterId, @Param("id") Long id);
Boolean existsByClusterName(@Param("clusterName") String clusterName, @Param("id") Long id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index a2c525441..2be56cb3e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -24,7 +24,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
public interface FlinkClusterService extends IService<FlinkCluster> {
- String check(FlinkCluster flinkCluster);
+ ResponseResult check(FlinkCluster flinkCluster);
ResponseResult create(FlinkCluster flinkCluster);
@@ -36,7 +36,7 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
ResponseResult shutdown(FlinkCluster flinkCluster);
- Boolean existsByClusterId(String clusterId);
+ Boolean existsByClusterId(String clusterId, Long id);
Boolean existsByClusterName(String clusterName, Long id);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index da14fa90a..8217aee40 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -17,11 +17,10 @@
package org.apache.streampark.console.core.service.impl;
-import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -49,10 +48,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
-import java.io.Serializable;
import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -84,44 +80,60 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
private SettingService settingService;
@Override
- public String check(FlinkCluster cluster) {
- if (null == cluster.getClusterName() || null == cluster.getExecutionMode()) {
- return "error";
- }
- //1) Check if name is duplicate, if it already exists
+ public ResponseResult check(FlinkCluster cluster) {
+ ResponseResult result = new ResponseResult();
+ result.setStatus(0);
+
+ //1) Check name is already exists
Boolean existsByClusterName = this.existsByClusterName(cluster.getClusterName(), cluster.getId());
if (existsByClusterName) {
- return "exists";
+ result.setMsg("clusterName is already exists,please check!");
+ result.setStatus(1);
+ return result;
+ }
+
+ //2) Check target-cluster is already exists
+ String clusterId = cluster.getClusterId();
+ if (StringUtils.isNotEmpty(clusterId)) {
+ Boolean existsByClusterId = this.existsByClusterId(clusterId, cluster.getId());
+ if (existsByClusterId) {
+ result.setMsg("the clusterId " + clusterId + " is already exists,please check!");
+ result.setStatus(2);
+ return result;
+ }
}
+ // 3) Check connection
if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
- //2) Check if the connection can be made to
- return cluster.verifyConnection() ? "success" : "fail";
+ if (!cluster.verifyConnection()) {
+ result.setMsg("the remote cluster connection failed, please check!");
+ result.setStatus(3);
+ return result;
+ }
+ } else if (ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())) {
+ if (cluster.getId() == null && !StringUtils.isAllBlank(cluster.getAddress(), cluster.getClusterId())) {
+ if (!cluster.verifyFlinkYarnCluster()) {
+ result.setMsg("the flink cluster connection failed, please check!");
+ result.setStatus(4);
+ return result;
+ }
+ }
}
- return "success";
+ return result;
}
@Override
public ResponseResult create(FlinkCluster flinkCluster) {
ResponseResult result = new ResponseResult();
- if (StringUtils.isBlank(flinkCluster.getClusterName())) {
- result.setMsg("clusterName can't empty!");
- result.setStatus(0);
- return result;
- }
- String clusterId = flinkCluster.getClusterId();
- if (StringUtils.isNoneBlank(clusterId)) {
- Boolean existsByClusterId = this.existsByClusterId(clusterId);
- if (existsByClusterId) {
- result.setMsg("the clusterId" + clusterId + "is already exists,please check!");
- result.setStatus(0);
- return result;
- }
- }
flinkCluster.setUserId(commonService.getUserId());
flinkCluster.setCreateTime(new Date());
- // remote mode directly set STARTED
- if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
+ if (StringUtils.isAllBlank(flinkCluster.getAddress(), flinkCluster.getClusterId())) {
+ flinkCluster.setClusterState(ClusterState.CREATED.getValue());
+ } else {
+ flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+ }
+ } else if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
flinkCluster.setClusterState(ClusterState.STARTED.getValue());
} else {
flinkCluster.setClusterState(ClusterState.CREATED.getValue());
@@ -167,7 +179,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
flinkEnv.getFlinkVersion(),
flinkCluster.getClusterId(),
executionModeEnum,
- flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) : null,
flinkCluster.getProperties(),
kubernetesDeployParam
);
@@ -176,8 +187,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
if (null != deployResponse) {
if (deployResponse.message() == null) {
- updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
+ updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
updateWrapper.set(FlinkCluster::getException, null);
update(updateWrapper);
@@ -198,8 +209,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
updateWrapper.set(FlinkCluster::getException, e.toString());
update(updateWrapper);
result.setStatus(0);
- result.setMsg("deploy cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
- return result;
+ throw new ApiDetailException(e);
}
}
@@ -277,8 +287,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public Boolean existsByClusterId(String clusterId) {
- return this.baseMapper.existsByClusterId(clusterId);
+ public Boolean existsByClusterId(String clusterId, Long id) {
+ return this.baseMapper.existsByClusterId(clusterId, id);
}
@Override
@@ -307,15 +317,4 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
return result;
}
- private Map<String, Serializable> getFlameGraph(FlinkCluster flinkCluster) {
- Map<String, Serializable> flameGraph = new HashMap<>(8);
- flameGraph.put("reporter", "org.apache.streampark.plugin.profiling.reporter.HttpReporter");
- flameGraph.put("type", ApplicationType.STREAMPARK_FLINK.getType());
- flameGraph.put("id", flinkCluster.getId());
- flameGraph.put("url", settingService.getStreamParkAddress().concat("/metrics/report"));
- flameGraph.put("token", Utils.uuid());
- flameGraph.put("sampleInterval", 1000 * 60 * 2);
- flameGraph.put("metricInterval", 1000 * 60 * 2);
- return flameGraph;
- }
}
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 1e1a85c9d..e22a44869 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -47,7 +47,12 @@
<select id="existsByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.String">
select count(1)
from t_flink_cluster
- where cluster_id=#{clusterId}
+ <where>
+ cluster_id=#{clusterId}
+ <if test="id != null">
+ and id <> #{id}
+ </if>
+ </where>
limit 1
</select>
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 21d5d1944..4f1b42492 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -291,7 +291,3 @@ export const filterOption = (input: string, options: Recordable) => {
export function isK8sExecMode(mode: number): boolean {
return [ExecModeEnum.KUBERNETES_SESSION, ExecModeEnum.KUBERNETES_APPLICATION].includes(mode);
}
-// session mode
-export function isSessionMode(mode: number): boolean {
- return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode);
-}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
index 0ee7c4afb..f67f0cbe7 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
@@ -50,7 +50,8 @@
const params = handleSubmitParams(values);
if (Object.keys(params).length > 0) {
const res = await fetchCheckCluster(params);
- if (res === 'success') {
+ const status = parseInt(res.status);
+ if (status === 0) {
const resp = await fetchCreateCluster(params);
if (resp.status) {
Swal.fire({
@@ -63,18 +64,8 @@
} else {
Swal.fire(resp.msg);
}
- } else if (res === 'exists') {
- Swal.fire(
- 'Failed',
- 'the cluster name: ' + values.clusterName + ' is already exists,please check',
- 'error',
- );
} else {
- Swal.fire(
- 'Failed',
- 'the address is invalid or connection failure, please check',
- 'error',
- );
+ Swal.fire('Failed', res.msg, 'error');
}
}
} catch (error) {
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
index 0b58d3842..32836455e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
@@ -65,7 +65,8 @@
id: cluster.id,
});
const res = await fetchCheckCluster(params);
- if (res === 'success') {
+ const status = parseInt(res.status);
+ if (status === 0) {
const resp = await fetchUpdateCluster(params);
if (resp.status) {
Swal.fire({
@@ -78,18 +79,8 @@
} else {
Swal.fire(resp.data.msg);
}
- } else if (res === 'exists') {
- Swal.fire(
- 'Failed',
- 'the cluster name: ' + values.clusterName + ' is already exists,please check',
- 'error',
- );
} else {
- Swal.fire(
- 'Failed',
- 'the address is invalid or connection failure, please check',
- 'error',
- );
+ Swal.fire('Failed', res.msg, 'error');
}
}
} catch (error) {
@@ -122,7 +113,6 @@
flinkImage: cluster.flinkImage,
serviceAccount: cluster.serviceAccount,
k8sConf: cluster.k8sConf,
- flameGraph: cluster.flameGraph,
k8sNamespace: cluster.k8sNamespace,
...resetParams,
});
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
index 77de7dca3..74698ad46 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
@@ -34,7 +34,7 @@ import {
fetchK8sNamespaces,
fetchSessionClusterIds,
} from '/@/api/flink/app/flinkHistory';
-import { handleFormValue, isSessionMode } from '../../app/utils';
+import { handleFormValue } from '../../app/utils';
import { useMessage } from '/@/hooks/web/useMessage';
import { ClusterAddTypeEnum } from '/@/enums/appEnum';
import { useI18n } from '/@/hooks/web/useI18n';
@@ -89,6 +89,22 @@ export const useClusterSetting = () => {
}
}
}
+
+ function isAddExistYarnSession(value: Recordable) {
+ return (
+ value.executionMode == ExecModeEnum.YARN_SESSION &&
+ value.addType == ClusterAddTypeEnum.ADD_EXISTING
+ );
+ }
+
+ // session mode
+ function isShowInSessionMode(value: Recordable): boolean {
+ if (value.executionMode == ExecModeEnum.YARN_SESSION) {
+ return value.addType == ClusterAddTypeEnum.ADD_NEW;
+ }
+ return value.executionMode == ExecModeEnum.KUBERNETES_SESSION;
+ }
+
const getClusterSchema = computed((): FormSchema[] => {
return [
{
@@ -127,18 +143,10 @@ export const useClusterSetting = () => {
},
rules: [{ required: true, message: 'Flink Version is required' }],
},
- {
- field: 'yarnQueue',
- label: 'Yarn Queue',
- component: 'Input',
- componentProps: {
- placeholder: 'Please enter yarn queue',
- },
- ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
- },
{
field: 'addType',
label: t('flink.setting.cluster.form.addType'),
+ ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
component: 'Select',
defaultValue: ClusterAddTypeEnum.ADD_EXISTING,
componentProps: {
@@ -156,6 +164,17 @@ export const useClusterSetting = () => {
],
},
},
+ {
+ field: 'yarnQueue',
+ label: 'Yarn Queue',
+ component: 'Input',
+ componentProps: {
+ placeholder: 'Please enter yarn queue',
+ },
+ ifShow: ({ values }) =>
+ values.executionMode == ExecModeEnum.YARN_SESSION &&
+ values.addType == ClusterAddTypeEnum.ADD_NEW,
+ },
{
field: 'address',
label: 'Address',
@@ -168,7 +187,8 @@ export const useClusterSetting = () => {
: 'Please enter cluster address, e.g: http://host:port',
};
},
- ifShow: ({ values }) => values.addType == ClusterAddTypeEnum.ADD_EXISTING,
+ ifShow: ({ values }) =>
+ isAddExistYarnSession(values) || values.executionMode == ExecModeEnum.REMOTE,
rules: [{ required: true, message: t('flink.setting.cluster.required.address') }],
},
{
@@ -178,9 +198,7 @@ export const useClusterSetting = () => {
componentProps: {
placeholder: 'Please enter Yarn Session clusterId',
},
- ifShow: ({ values }) =>
- values.addType == ClusterAddTypeEnum.ADD_EXISTING &&
- values.executionMode == ExecModeEnum.YARN_SESSION,
+ ifShow: ({ values }) => isAddExistYarnSession(values),
rules: [{ required: true, message: t('flink.setting.cluster.required.clusterId') }],
},
{
@@ -254,7 +272,7 @@ export const useClusterSetting = () => {
{
field: 'resolveOrder',
label: 'Resolve Order',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
componentProps: { placeholder: 'classloader.resolve-order', options: resolveOrder },
rules: [{ required: true, message: 'Resolve Order is required', type: 'number' }],
@@ -262,7 +280,7 @@ export const useClusterSetting = () => {
{
field: 'slot',
label: 'Task Slots',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'InputNumber',
componentProps: {
placeholder: 'Number of slots per TaskManager',
@@ -274,14 +292,14 @@ export const useClusterSetting = () => {
{
field: 'totalOptions',
label: 'Total Memory Options',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
render: (renderCallbackParams) => renderTotalMemory(renderCallbackParams),
},
{
field: 'totalItem',
label: 'totalItem',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
renderColContent: ({ model, field }) =>
renderOptionsItems(model, 'totalOptions', field, '.memory', true),
@@ -289,7 +307,7 @@ export const useClusterSetting = () => {
{
field: 'jmOptions',
label: 'JM Memory Options',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
componentProps: {
showSearch: true,
@@ -304,7 +322,7 @@ export const useClusterSetting = () => {
{
field: 'jmOptionsItem',
label: 'jmOptionsItem',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
renderColContent: ({ model, field }) =>
renderOptionsItems(model, 'jmOptions', field, 'jobmanager.memory.'),
@@ -312,7 +330,7 @@ export const useClusterSetting = () => {
{
field: 'tmOptions',
label: 'TM Memory Options',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
componentProps: {
showSearch: true,
@@ -327,7 +345,7 @@ export const useClusterSetting = () => {
{
field: 'tmOptionsItem',
label: 'tmOptionsItem',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
renderColContent: ({ model, field }) =>
renderOptionsItems(model, 'tmOptions', field, 'taskmanager.memory.'),
@@ -335,7 +353,7 @@ export const useClusterSetting = () => {
{
field: 'dynamicProperties',
label: 'Dynamic Properties',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Input',
render: (renderCallbackParams) => renderDynamicProperties(renderCallbackParams),
},
@@ -366,14 +384,18 @@ export const useClusterSetting = () => {
});
return params;
case ExecModeEnum.YARN_SESSION:
- Object.assign(params, {
- options: JSON.stringify(options),
- yarnQueue: values.yarnQueue || 'default',
- dynamicProperties: values.dynamicProperties,
- resolveOrder: values.resolveOrder,
- address: values.address,
- flameGraph: values.flameGraph,
- });
+ if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) {
+ Object.assign(params, {
+ address: values.address,
+ });
+ } else {
+ Object.assign(params, {
+ options: JSON.stringify(options),
+ yarnQueue: values.yarnQueue || 'default',
+ dynamicProperties: values.dynamicProperties,
+ resolveOrder: values.resolveOrder,
+ });
+ }
return params;
case ExecModeEnum.KUBERNETES_SESSION:
Object.assign(params, {
@@ -386,7 +408,6 @@ export const useClusterSetting = () => {
k8sConf: values.k8sConf,
flinkImage: values.flinkImage || null,
address: values.address,
- flameGraph: values.flameGraph,
});
return params;
default:
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
index 278d46758..264653d49 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
case class DeployRequest(flinkVersion: FlinkVersion,
clusterId: String,
executionMode: ExecutionMode,
- flameGraph: JavaMap[String, java.io.Serializable],
properties: JavaMap[String, Any],
@Nullable k8sDeployParam: KubernetesDeployParam) {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
index 9e91914ff..9872d082a 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
@@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
| exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType}
| serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
| flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
- | flameGraph : ${deployRequest.flameGraph != null}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
@@ -158,7 +157,7 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
client = clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
if (client.getWebInterfaceURL != null) {
- DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+ DeployResponse(client.getWebInterfaceURL, client.getClusterId)
} else {
null
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 21b216e77..128771131 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -172,7 +172,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
- | flameGraph : ${deployRequest.flameGraph != null}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)