You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/29 15:37:06 UTC
[incubator-streampark] branch dev updated: [Bug] flink cluster management bug fixed (#2100)
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 6d6644392 [Bug] flink cluster management bug fixed (#2100)
6d6644392 is described below
commit 6d664439265535980789808802a9e06ec817553d
Author: benjobs <be...@apache.org>
AuthorDate: Tue Nov 29 23:36:59 2022 +0800
[Bug] flink cluster management bug fixed (#2100)
---
.../streampark/common/enums/ClusterState.java | 7 +-
.../streampark/common/util/HadoopConfigUtils.scala | 6 +-
.../src/assembly/script/schema/mysql-schema.sql | 1 -
.../src/assembly/script/schema/pgsql-schema.sql | 1 -
.../src/assembly/script/upgrade/mysql/2.0.0.sql | 4 +-
.../streampark/console/base/util/CommonUtils.java | 11 +-
.../core/controller/FlinkClusterController.java | 48 ++--
.../console/core/entity/AppBuildPipeline.java | 10 +-
.../console/core/entity/Application.java | 11 +-
.../console/core/entity/FlinkCluster.java | 61 +++--
.../console/core/mapper/ApplicationMapper.java | 4 +
.../console/core/mapper/FlinkClusterMapper.java | 2 +-
.../console/core/service/ApplicationService.java | 3 +
.../console/core/service/FlinkClusterService.java | 14 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 6 +-
.../core/service/impl/ApplicationServiceImpl.java | 84 ++++---
.../core/service/impl/FlinkClusterServiceImpl.java | 245 +++++++++++----------
.../src/main/resources/db/schema-h2.sql | 1 -
.../resources/mapper/core/ApplicationMapper.xml | 16 ++
.../resources/mapper/core/FlinkClusterMapper.xml | 8 +-
.../src/api/flink/app/app.type.ts | 2 -
.../src/enums/flinkEnum.ts | 6 +-
.../src/locales/lang/en/flink/app.ts | 2 -
.../src/locales/lang/en/flink/setting.ts | 2 +-
.../src/locales/lang/zh-CN/flink/app.ts | 2 -
.../src/locales/lang/zh-CN/flink/setting.ts | 2 +-
.../src/views/flink/app/Add.vue | 11 +-
.../src/views/flink/app/EditFlink.vue | 1 -
.../src/views/flink/app/EditStreamPark.vue | 27 +--
.../flink/app/hooks/useCreateAndEditSchema.ts | 12 +-
.../src/views/flink/app/hooks/useFlinkSchema.ts | 20 +-
.../src/views/flink/app/utils/index.ts | 5 -
.../src/views/flink/setting/AddCluster.vue | 19 +-
.../src/views/flink/setting/EditCluster.vue | 36 +--
.../setting/components/FlinkClusterSetting.vue | 184 ++++------------
.../views/flink/setting/hooks/useClusterSetting.ts | 94 +++++---
.../flink/submit/bean/DeployRequest.scala | 1 -
.../flink/submit/bean/DeployResponse.scala | 4 +-
.../impl/KubernetesNativeApplicationSubmit.scala | 1 -
.../impl/KubernetesNativeSessionSubmit.scala | 3 +-
.../streampark/flink/submit/impl/LocalSubmit.scala | 3 +-
.../flink/submit/impl/RemoteSubmit.scala | 3 +-
.../flink/submit/impl/YarnSessionSubmit.scala | 13 +-
.../submit/trait/KubernetesNativeSubmitTrait.scala | 3 +-
.../flink/submit/trait/YarnSubmitTrait.scala | 4 +-
45 files changed, 457 insertions(+), 546 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
index cc2f70138..74d12f051 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ClusterState.java
@@ -34,7 +34,12 @@ public enum ClusterState implements Serializable {
/**
* cluster stopped
*/
- STOPED(2);
+ STOPED(2),
+
+ /**
+ * cluster lost
+ */
+ LOST(3);
private final Integer value;
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
index 3cfadbb68..8c67ea77e 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.fs.LfsOperator
import org.apache.commons.io.{FileUtils => ApacheFileUtils}
import java.io.File
-import java.util.{Map => JavaMap, Optional => JOption}
+import java.util.{Collections, Map => JavaMap, Optional => JOption}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
@@ -117,7 +117,7 @@ object HadoopConfigUtils {
.filter(f => HADOOP_CLIENT_CONF_FILES.contains(f.getName))
.map(f => f.getName -> ApacheFileUtils.readFileToString(f, "UTF-8"))
.toMap.asJava)
- .getOrElse(Maps.newHashMap[String, String])
+ .getOrElse(Collections.emptyMap[String, String]())
/**
* Read system hive config to Map
@@ -129,7 +129,7 @@ object HadoopConfigUtils {
.filter(f => HIVE_CLIENT_CONF_FILES.contains(f.getName))
.map(f => f.getName -> ApacheFileUtils.readFileToString(f, "UTF-8"))
.toMap.asJava)
- .getOrElse(Maps.newHashMap[String, String])
+ .getOrElse(Collections.emptyMap[String, String]())
}
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
index dbc244896..dda27ac56 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
@@ -461,7 +461,6 @@ create table `t_flink_cluster` (
`dynamic_properties` text comment 'allows specifying multiple generic configuration options',
`k8s_rest_exposed_type` tinyint default 2 comment 'k8s export(0:loadbalancer,1:clusterip,2:nodeport)',
`k8s_hadoop_integration` tinyint default 0,
- `flame_graph` tinyint default 0 comment 'flameGraph enable,default disable',
`k8s_conf` varchar(255) default null comment 'the path where the k8s configuration file is located',
`resolve_order` int default null,
`exception` text comment 'exception information',
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
index 38ec0c479..ecefb7de6 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
@@ -311,7 +311,6 @@ create table "public"."t_flink_cluster" (
"dynamic_properties" text collate "pg_catalog"."default",
"k8s_rest_exposed_type" int2 default 2,
"k8s_hadoop_integration" boolean default false,
- "flame_graph" boolean default false,
"k8s_conf" varchar(255) collate "pg_catalog"."default",
"resolve_order" int4,
"exception" text collate "pg_catalog"."default",
diff --git a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
index 888325aee..872dee223 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql/2.0.0.sql
@@ -79,7 +79,9 @@ alter table `t_flink_project`
add column `modify_time` datetime not null default current_timestamp on update current_timestamp after `create_time`,
add index `inx_team` (`team_id`) using btree;
-alter table `t_flink_cluster` add column `dynamic_properties` text comment 'allows specifying multiple generic configuration options' after `flink_image`;
+alter table `t_flink_cluster`
+ drop column `flame_graph`,
+ add column `dynamic_properties` text comment 'allows specifying multiple generic configuration options' after `flink_image`;
-- change `update_time` to `modify_time`
alter table `t_app_build_pipe` change column `update_time` `modify_time` datetime not null default current_timestamp on update current_timestamp;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
index 593fc9415..996a9251f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.base.util;
-import org.apache.streampark.common.util.AssertUtils;
+import org.apache.streampark.common.util.AssertUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cglib.beans.BeanMap;
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -637,4 +638,12 @@ public final class CommonUtils implements Serializable {
}
}
+ public static boolean isLegalUrl(String url) {
+ try {
+ new URI(url);
+ return true;
+ } catch (Exception ignored) {
+ return false;
+ }
+ }
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 5927012e5..942b408b8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -57,39 +57,22 @@ public class FlinkClusterController {
@PostMapping("check")
public RestResponse check(FlinkCluster cluster) {
- String checkResult = flinkClusterService.check(cluster);
+ ResponseResult checkResult = flinkClusterService.check(cluster);
return RestResponse.success(checkResult);
}
@PostMapping("create")
@RequiresPermissions("cluster:create")
public RestResponse create(FlinkCluster cluster) {
- ResponseResult result = flinkClusterService.create(cluster);
- return RestResponse.success(result);
+ Boolean success = flinkClusterService.create(cluster);
+ return RestResponse.success(success);
}
@PostMapping("update")
@RequiresPermissions("cluster:update")
public RestResponse update(FlinkCluster cluster) {
- FlinkCluster flinkCluster = flinkClusterService.getById(cluster.getId());
- flinkCluster.setClusterId(cluster.getClusterId());
- flinkCluster.setVersionId(cluster.getVersionId());
- flinkCluster.setClusterName(cluster.getClusterName());
- flinkCluster.setAddress(cluster.getAddress());
- flinkCluster.setExecutionMode(cluster.getExecutionMode());
- flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
- flinkCluster.setFlameGraph(cluster.getFlameGraph());
- flinkCluster.setFlinkImage(cluster.getFlinkImage());
- flinkCluster.setOptions(cluster.getOptions());
- flinkCluster.setYarnQueue(cluster.getYarnQueue());
- flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
- flinkCluster.setK8sConf(cluster.getK8sConf());
- flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
- flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
- flinkCluster.setResolveOrder(cluster.getResolveOrder());
- flinkCluster.setServiceAccount(cluster.getServiceAccount());
- flinkCluster.setDescription(cluster.getDescription());
- return RestResponse.success(flinkClusterService.update(flinkCluster));
+ flinkClusterService.update(cluster);
+ return RestResponse.success();
}
@PostMapping("get")
@@ -99,23 +82,20 @@ public class FlinkClusterController {
}
@PostMapping("start")
- public RestResponse start(FlinkCluster flinkCluster) {
- FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
- ResponseResult start = flinkClusterService.start(cluster);
- return RestResponse.success(start);
+ public RestResponse start(FlinkCluster cluster) {
+ flinkClusterService.start(cluster);
+ return RestResponse.success();
}
@PostMapping("shutdown")
- public RestResponse shutdown(FlinkCluster flinkCluster) {
- FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
- ResponseResult shutdown = flinkClusterService.shutdown(cluster);
- return RestResponse.success(shutdown);
+ public RestResponse shutdown(FlinkCluster cluster) {
+ flinkClusterService.shutdown(cluster);
+ return RestResponse.success();
}
@PostMapping("delete")
- public RestResponse delete(FlinkCluster flinkCluster) {
- FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
- ResponseResult delete = flinkClusterService.delete(cluster);
- return RestResponse.success(delete);
+ public RestResponse delete(FlinkCluster cluster) {
+ flinkClusterService.delete(cluster);
+ return RestResponse.success();
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
index ee1a9b38a..c7a3046e7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
@@ -34,7 +34,6 @@ import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -46,6 +45,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -122,14 +122,14 @@ public class AppBuildPipeline {
@JsonIgnore
public Map<Integer, PipelineStepStatus> getStepStatus() {
if (StringUtils.isBlank(stepStatusJson)) {
- return Maps.newHashMap();
+ return Collections.emptyMap();
}
try {
return JacksonUtils.read(stepStatusJson, new TypeReference<HashMap<Integer, PipelineStepStatus>>() {
});
} catch (JsonProcessingException e) {
log.error("json parse error on ApplicationBuildPipeline, stepStatusJson={}", stepStatusJson, e);
- return Maps.newHashMap();
+ return Collections.emptyMap();
}
}
@@ -148,14 +148,14 @@ public class AppBuildPipeline {
@JsonIgnore
public Map<Integer, Long> getStepStatusTimestamp() {
if (StringUtils.isBlank(stepStatusTimestampJson)) {
- return Maps.newHashMap();
+ return Collections.emptyMap();
}
try {
return JacksonUtils.read(stepStatusTimestampJson, new TypeReference<HashMap<Integer, Long>>() {
});
} catch (JsonProcessingException e) {
log.error("json parse error on ApplicationBuildPipeline, stepStatusJson={}", stepStatusTimestampJson, e);
- return Maps.newHashMap();
+ return Collections.emptyMap();
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 8be656d51..3c2650350 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -226,6 +226,7 @@ public class Application implements Serializable {
/**
* the cluster id bound to the task in remote mode
*/
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
private Long flinkClusterId;
private String description;
@@ -296,7 +297,6 @@ public class Application implements Serializable {
private transient String createTimeTo;
private transient String backUpDescription;
private transient String yarnQueue;
- private transient String yarnSessionClusterId;
/**
* Flink Web UI Url
@@ -482,7 +482,10 @@ public class Application implements Serializable {
@SneakyThrows
@SuppressWarnings("unchecked")
public Map<String, Object> getOptionMap() {
- Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
+ if (StringUtils.isBlank(this.options)) {
+ return Collections.emptyMap();
+ }
+ Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
map.entrySet().removeIf(entry -> entry.getValue() == null);
return map;
}
@@ -665,10 +668,6 @@ public class Application implements Serializable {
if (StringUtils.isNotEmpty(appParam.getYarnQueue())) {
hotParams.put(ConfigConst.KEY_YARN_APP_QUEUE(), appParam.getYarnQueue());
}
- } else if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
- if (StringUtils.isNotEmpty(appParam.getYarnSessionClusterId())) {
- hotParams.put(ConfigConst.KEY_YARN_APP_ID(), appParam.getYarnSessionClusterId());
- }
}
if (!hotParams.isEmpty()) {
this.setHotParams(JacksonUtils.write(hotParams));
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 9ee264226..e9c55b2ec 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -23,7 +23,9 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.flink.submit.FlinkSubmitter;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -35,12 +37,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.Data;
import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.http.client.config.RequestConfig;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URI;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -86,8 +90,6 @@ public class FlinkCluster implements Serializable {
private Integer k8sRestExposedType;
- private Boolean flameGraph;
-
private String k8sConf;
private Integer resolveOrder;
@@ -115,10 +117,15 @@ public class FlinkCluster implements Serializable {
@JsonIgnore
@SneakyThrows
public Map<String, Object> getOptionMap() {
- Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class);
+ if (StringUtils.isBlank(this.options)) {
+ return Collections.emptyMap();
+ }
+ Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
- map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+ if (StringUtils.isNotEmpty(this.yarnQueue)) {
+ map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue);
+ }
}
map.entrySet().removeIf(entry -> entry.getValue() == null);
return map;
@@ -138,23 +145,38 @@ public class FlinkCluster implements Serializable {
return null;
}
- public boolean verifyConnection() {
- if (address == null) {
- return false;
- }
- String[] array = address.split(",");
- for (String url : array) {
- try {
- new URI(url);
- } catch (Exception ignored) {
+ public boolean verifyClusterConnection() {
+ if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+ ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+ if (address == null) {
return false;
}
- try {
- HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
- return true;
- } catch (Exception ignored) {
- //
+ String[] array = address.split(",");
+
+ //1) check url is Legal
+ for (String url: array) {
+ if (!CommonUtils.isLegalUrl(url)) {
+ return false;
+ }
}
+
+ // 2) check connection
+ for (String url : array) {
+ try {
+ String restUrl;
+ if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+ restUrl = url + "/overview";
+ } else {
+ restUrl = url + "/proxy/" + this.clusterId + "/overview";
+ }
+ String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+ JacksonUtils.read(result, Overview.class);
+ return true;
+ } catch (Exception ignored) {
+ //
+ }
+ }
+ return false;
}
return false;
}
@@ -164,6 +186,9 @@ public class FlinkCluster implements Serializable {
URI activeAddress = this.getActiveAddress();
String restUrl = activeAddress.toURL() + "/jobmanager/config";
String json = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+ if (StringUtils.isEmpty(json)) {
+ return Collections.emptyMap();
+ }
List<Map<String, String>> confList = JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>() {
});
Map<String, String> config = new HashMap<>(0);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 88472e956..e8ed4bed0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -57,4 +57,8 @@ public interface ApplicationMapper extends BaseMapper<Application> {
Boolean existsByJobName(@Param("jobName") String jobName);
List<Application> getByProjectId(@Param("projectId") Long id);
+
+ boolean existsRunningJobByClusterId(@Param("clusterId")Long clusterId);
+
+ boolean existsJobByClusterId(@Param("clusterId")Long clusterId);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
index 428aa8e99..d9e9b6558 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
@@ -24,7 +24,7 @@ import org.apache.ibatis.annotations.Param;
public interface FlinkClusterMapper extends BaseMapper<FlinkCluster> {
- Boolean existsByClusterId(@Param("clusterId") String clusterId);
+ Boolean existsByClusterId(@Param("clusterId") String clusterId, @Param("id") Long id);
Boolean existsByClusterName(@Param("clusterName") String clusterName, @Param("id") Long id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index c3dfc493a..274ab0d13 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -98,4 +98,7 @@ public interface ApplicationService extends IService<Application> {
void forcedStop(Application app);
+ boolean existsRunningJobByClusterId(Long clusterId);
+
+ boolean existsJobByClusterId(Long id);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index a2c525441..ecf775994 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -24,19 +24,19 @@ import com.baomidou.mybatisplus.extension.service.IService;
public interface FlinkClusterService extends IService<FlinkCluster> {
- String check(FlinkCluster flinkCluster);
+ ResponseResult check(FlinkCluster flinkCluster);
- ResponseResult create(FlinkCluster flinkCluster);
+ Boolean create(FlinkCluster flinkCluster);
- ResponseResult delete(FlinkCluster flinkCluster);
+ void delete(FlinkCluster flinkCluster);
- ResponseResult update(FlinkCluster flinkCluster);
+ void update(FlinkCluster flinkCluster);
- ResponseResult start(FlinkCluster flinkCluster);
+ void start(FlinkCluster flinkCluster);
- ResponseResult shutdown(FlinkCluster flinkCluster);
+ void shutdown(FlinkCluster flinkCluster);
- Boolean existsByClusterId(String clusterId);
+ Boolean existsByClusterId(String clusterId, Long id);
Boolean existsByClusterName(String clusterName, Long id);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index e19e3c371..7daf79764 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -73,7 +73,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -84,6 +83,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import java.io.File;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -442,14 +442,14 @@ public class AppBuildPipeServiceImpl
@Override
public Map<Long, PipelineStatus> listPipelineStatus(List<Long> appIds) {
if (CollectionUtils.isEmpty(appIds)) {
- return Maps.newHashMap();
+ return Collections.emptyMap();
}
LambdaQueryWrapper<AppBuildPipeline> queryWrapper = new LambdaQueryWrapper<AppBuildPipeline>()
.in(AppBuildPipeline::getAppId, appIds);
List<AppBuildPipeline> appBuildPipelines = baseMapper.selectList(queryWrapper);
if (CollectionUtils.isEmpty(appBuildPipelines)) {
- return Maps.newHashMap();
+ return Collections.emptyMap();
}
return appBuildPipelines.stream().collect(Collectors.toMap(e -> e.getAppId(), e -> e.getPipelineStatus()));
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index b253dccdb..42ff934c4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
@@ -436,10 +437,19 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
envInitializer.checkFlinkEnv(application.getStorageType(), flinkEnv);
envInitializer.storageInitialize(application.getStorageType());
+
+ if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
+ || ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
+ FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId());
+ boolean conned = flinkCluster.verifyClusterConnection();
+ if (!conned) {
+ throw new ApiAlertException("the target cluster is unavailable, please check!");
+ }
+ }
return true;
} catch (Exception e) {
log.error(ExceptionUtils.stringifyException(e));
- throw new ApplicationException(e);
+ throw new ApiDetailException(e);
}
}
@@ -519,6 +529,26 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
return baseMapper.existsByTeamId(teamId);
}
+ @Override
+ public boolean existsRunningJobByClusterId(Long clusterId) {
+ boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+ if (exists) {
+ return true;
+ }
+ for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
+ if (clusterId.equals(application.getFlinkClusterId()) &&
+ FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean existsJobByClusterId(Long clusterId) {
+ return baseMapper.existsJobByClusterId(clusterId);
+ }
+
@Override
public String getYarnName(Application appParam) {
String[] args = new String[2];
@@ -711,9 +741,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
public boolean update(Application appParam) {
try {
Application application = getById(appParam.getId());
-
application.setLaunch(LaunchState.NEED_LAUNCH.get());
-
if (application.isUploadJob()) {
if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
@@ -776,9 +804,14 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
application.setCpFailureAction(appParam.getCpFailureAction());
application.setCpFailureRateInterval(appParam.getCpFailureRateInterval());
application.setCpMaxFailureInterval(appParam.getCpMaxFailureInterval());
- application.setFlinkClusterId(appParam.getFlinkClusterId());
application.setTags(appParam.getTags());
+ if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum()) ||
+ ExecutionMode.YARN_PER_JOB.equals(application.getExecutionModeEnum()) ||
+ ExecutionMode.KUBERNETES_NATIVE_APPLICATION.equals(application.getExecutionModeEnum())) {
+ application.setFlinkClusterId(null);
+ }
+
// Flink Sql job...
if (application.isFlinkSqlJob()) {
updateFlinkSqlJob(application, appParam);
@@ -942,7 +975,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
if (startFuture == null && cancelFuture == null) {
this.updateToStopped(app);
}
-
}
@Override
@@ -998,10 +1030,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
application.setYarnQueue(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE()).toString());
}
- } else if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_ID())) {
- application.setYarnSessionClusterId(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID()).toString());
- }
}
}
return application;
@@ -1052,6 +1080,11 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
application.setOptionTime(new Date());
this.baseMapper.updateById(application);
+ Long userId = commonService.getUserId();
+ if (!application.getUserId().equals(userId)) {
+ FlinkTrackingTask.addCanceledApp(application.getId(), userId);
+ }
+
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
// infer savepoint
@@ -1063,6 +1096,21 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
}
+ String clusterId = null;
+ if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
+ clusterId = application.getClusterId();
+ } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+ FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
+ AssertUtils.state(cluster != null,
+ String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
+ clusterId = cluster.getClusterId();
+ } else {
+ clusterId = application.getAppId();
+ }
+ }
+
Map<String, Object> properties = new HashMap<>();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
@@ -1074,26 +1122,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
URI activeAddress = cluster.getActiveAddress();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
- } else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
- if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- AssertUtils.state(cluster != null,
- String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
- "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
- properties.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
- }
}
- Long userId = commonService.getUserId();
- if (!application.getUserId().equals(userId)) {
- FlinkTrackingTask.addCanceledApp(application.getId(), userId);
- }
- String clusterId = null;
- if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
- clusterId = application.getClusterId();
- } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- clusterId = application.getAppId();
- }
CancelRequest cancelRequest = new CancelRequest(
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index da14fa90a..27a69da6d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -17,19 +17,19 @@
package org.apache.streampark.console.core.service.impl;
-import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.mapper.FlinkClusterMapper;
+import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
-import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.task.FlinkTrackingTask;
import org.apache.streampark.flink.submit.FlinkSubmitter;
import org.apache.streampark.flink.submit.bean.DeployRequest;
@@ -49,10 +49,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
-import java.io.Serializable;
import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -80,66 +77,76 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
@Autowired
private CommonService commonService;
+
@Autowired
- private SettingService settingService;
+ private ApplicationService applicationService;
@Override
- public String check(FlinkCluster cluster) {
- if (null == cluster.getClusterName() || null == cluster.getExecutionMode()) {
- return "error";
- }
- //1) Check if name is duplicate, if it already exists
+ public ResponseResult check(FlinkCluster cluster) {
+ ResponseResult result = new ResponseResult();
+ result.setStatus(0);
+
+ //1) Check name is already exists
Boolean existsByClusterName = this.existsByClusterName(cluster.getClusterName(), cluster.getId());
if (existsByClusterName) {
- return "exists";
+ result.setMsg("clusterName is already exists,please check!");
+ result.setStatus(1);
+ return result;
}
- if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
- //2) Check if the connection can be made to
- return cluster.verifyConnection() ? "success" : "fail";
+ //2) Check target-cluster is already exists
+ String clusterId = cluster.getClusterId();
+ if (StringUtils.isNotEmpty(clusterId)) {
+ Boolean existsByClusterId = this.existsByClusterId(clusterId, cluster.getId());
+ if (existsByClusterId) {
+ result.setMsg("the clusterId " + clusterId + " is already exists,please check!");
+ result.setStatus(2);
+ return result;
+ }
}
- return "success";
- }
- @Override
- public ResponseResult create(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
- if (StringUtils.isBlank(flinkCluster.getClusterName())) {
- result.setMsg("clusterName can't empty!");
- result.setStatus(0);
- return result;
- }
- String clusterId = flinkCluster.getClusterId();
- if (StringUtils.isNoneBlank(clusterId)) {
- Boolean existsByClusterId = this.existsByClusterId(clusterId);
- if (existsByClusterId) {
- result.setMsg("the clusterId" + clusterId + "is already exists,please check!");
- result.setStatus(0);
+ // 3) Check connection
+ if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
+ if (!cluster.verifyClusterConnection()) {
+ result.setMsg("the remote cluster connection failed, please check!");
+ result.setStatus(3);
return result;
}
+ } else if (ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())) {
+ if (cluster.getId() == null && !StringUtils.isAllBlank(cluster.getAddress(), cluster.getClusterId())) {
+ if (!cluster.verifyClusterConnection()) {
+ result.setMsg("the flink cluster connection failed, please check!");
+ result.setStatus(4);
+ return result;
+ }
+ }
}
+
+ return result;
+ }
+
+ @Override
+ public Boolean create(FlinkCluster flinkCluster) {
flinkCluster.setUserId(commonService.getUserId());
flinkCluster.setCreateTime(new Date());
- // remote mode directly set STARTED
- if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+ if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
+ if (StringUtils.isAllBlank(flinkCluster.getAddress(), flinkCluster.getClusterId())) {
+ flinkCluster.setClusterState(ClusterState.CREATED.getValue());
+ } else {
+ flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+ }
+ } else if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
flinkCluster.setClusterState(ClusterState.STARTED.getValue());
} else {
flinkCluster.setClusterState(ClusterState.CREATED.getValue());
}
- try {
- save(flinkCluster);
- result.setStatus(1);
- } catch (Exception e) {
- result.setStatus(0);
- result.setMsg("create cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
- }
- return result;
+ return save(flinkCluster);
}
@Override
@Transactional(rollbackFor = {Exception.class})
- public ResponseResult start(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public void start(FlinkCluster cluster) {
+ FlinkCluster flinkCluster = getById(cluster.getId());
LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
try {
@@ -158,67 +165,68 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.getK8sRestExposedTypeEnum());
break;
default:
- result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
- result.setStatus(0);
- return result;
+ throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
}
FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
DeployRequest deployRequest = new DeployRequest(
flinkEnv.getFlinkVersion(),
flinkCluster.getClusterId(),
executionModeEnum,
- flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) : null,
flinkCluster.getProperties(),
kubernetesDeployParam
);
log.info("deploy cluster request " + deployRequest);
Future<DeployResponse> future = executorService.submit(() -> FlinkSubmitter.deploy(deployRequest));
DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
- if (null != deployResponse) {
- if (deployResponse.message() == null) {
- updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
- updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
- updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
- updateWrapper.set(FlinkCluster::getException, null);
- update(updateWrapper);
- result.setStatus(1);
- FlinkTrackingTask.removeFlinkCluster(flinkCluster);
- } else {
- result.setStatus(0);
- result.setMsg("deploy cluster failed," + deployResponse.message());
- }
+ if (deployResponse != null) {
+ updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
+ updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
+ updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
+ updateWrapper.set(FlinkCluster::getException, null);
+ update(updateWrapper);
+ FlinkTrackingTask.removeFlinkCluster(flinkCluster);
} else {
- result.setStatus(0);
- result.setMsg("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
+ throw new ApiAlertException("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
}
- return result;
} catch (Exception e) {
log.error(e.getMessage(), e);
updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
updateWrapper.set(FlinkCluster::getException, e.toString());
update(updateWrapper);
- result.setStatus(0);
- result.setMsg("deploy cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
- return result;
+ throw new ApiDetailException(e);
}
}
@Override
- public ResponseResult update(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public void update(FlinkCluster cluster) {
+ FlinkCluster flinkCluster = getById(cluster.getId());
+ flinkCluster.setClusterId(cluster.getClusterId());
+ flinkCluster.setVersionId(cluster.getVersionId());
+ flinkCluster.setClusterName(cluster.getClusterName());
+ flinkCluster.setAddress(cluster.getAddress());
+ flinkCluster.setExecutionMode(cluster.getExecutionMode());
+ flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
+ flinkCluster.setFlinkImage(cluster.getFlinkImage());
+ flinkCluster.setOptions(cluster.getOptions());
+ flinkCluster.setYarnQueue(cluster.getYarnQueue());
+ flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
+ flinkCluster.setK8sConf(cluster.getK8sConf());
+ flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
+ flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
+ flinkCluster.setResolveOrder(cluster.getResolveOrder());
+ flinkCluster.setServiceAccount(cluster.getServiceAccount());
+ flinkCluster.setDescription(cluster.getDescription());
try {
updateById(flinkCluster);
- result.setStatus(1);
} catch (Exception e) {
- result.setStatus(0);
- result.setMsg("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
+ throw new ApiDetailException("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
}
- return result;
}
@Override
- public ResponseResult shutdown(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public void shutdown(FlinkCluster cluster) {
+ FlinkCluster flinkCluster = this.getById(cluster.getId());
+ //1) check mode
ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
String clusterId = flinkCluster.getClusterId();
KubernetesDeployParam kubernetesDeployParam = null;
@@ -235,15 +243,35 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.getK8sRestExposedTypeEnum());
break;
default:
- result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
- result.setStatus(0);
- return result;
+ throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
}
if (StringUtils.isBlank(clusterId)) {
- result.setMsg("the clusterId is Empty!");
- result.setStatus(0);
- return result;
+ throw new ApiAlertException("the clusterId can not be empty!");
}
+
+ LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
+ updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
+
+ //2) check cluster is active
+ if (ExecutionMode.YARN_SESSION.equals(executionModeEnum) || ExecutionMode.REMOTE.equals(executionModeEnum)) {
+ if (ClusterState.STARTED.equals(ClusterState.of(flinkCluster.getClusterState()))) {
+ if (!flinkCluster.verifyClusterConnection()) {
+ updateWrapper.set(FlinkCluster::getClusterState, ClusterState.LOST.getValue());
+ update(updateWrapper);
+ throw new ApiAlertException("current cluster is not active, please check");
+ }
+ } else {
+ throw new ApiAlertException("current cluster is not active, please check");
+ }
+ }
+
+ //3) check job if running on cluster
+ boolean existsRunningJob = applicationService.existsRunningJobByClusterId(flinkCluster.getId());
+ if (existsRunningJob) {
+ throw new ApiAlertException("some app is running on this cluster, the cluster cannot be shutdown");
+ }
+
+ //4) shutdown
FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
ShutDownRequest stopRequest = new ShutDownRequest(
flinkEnv.getFlinkVersion(),
@@ -252,33 +280,27 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
kubernetesDeployParam,
flinkCluster.getProperties()
);
- LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
- updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
+
try {
Future<ShutDownResponse> future = executorService.submit(() -> FlinkSubmitter.shutdown(stopRequest));
ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
- if (null != shutDownResponse) {
+ if (shutDownResponse != null) {
updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
update(updateWrapper);
- result.setStatus(1);
- return result;
+ } else {
+ throw new ApiAlertException("get shutdown response failed");
}
- result.setStatus(1);
- result.setMsg("clusterId is not exists!");
- return result;
} catch (Exception e) {
log.error(e.getMessage(), e);
updateWrapper.set(FlinkCluster::getException, e.toString());
update(updateWrapper);
- result.setStatus(0);
- result.setMsg("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
- return result;
+ throw new ApiDetailException("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
}
}
@Override
- public Boolean existsByClusterId(String clusterId) {
- return this.baseMapper.existsByClusterId(clusterId);
+ public Boolean existsByClusterId(String clusterId, Long id) {
+ return this.baseMapper.existsByClusterId(clusterId, id);
}
@Override
@@ -287,35 +309,16 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public ResponseResult delete(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
- if (StringUtils.isNoneBlank(flinkCluster.getClusterId())
- && ClusterState.STARTED.equals(flinkCluster.getClusterStateEnum())
- && !ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
- result = shutdown(flinkCluster);
- if (0 == result.getStatus()) {
- return result;
- }
+ public void delete(FlinkCluster cluster) {
+ Long id = cluster.getId();
+ FlinkCluster flinkCluster = getById(id);
+ if (flinkCluster == null) {
+ throw new ApiAlertException("flink cluster not exist, please check.");
}
- try {
- removeById(flinkCluster.getId());
- result.setStatus(1);
- } catch (Exception e) {
- result.setStatus(0);
- result.setMsg("delete cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
+ if (applicationService.existsJobByClusterId(id)) {
+ throw new ApiAlertException("some app on this cluster, the cluster cannot be delete, please check.");
}
- return result;
+ removeById(id);
}
- private Map<String, Serializable> getFlameGraph(FlinkCluster flinkCluster) {
- Map<String, Serializable> flameGraph = new HashMap<>(8);
- flameGraph.put("reporter", "org.apache.streampark.plugin.profiling.reporter.HttpReporter");
- flameGraph.put("type", ApplicationType.STREAMPARK_FLINK.getType());
- flameGraph.put("id", flinkCluster.getId());
- flameGraph.put("url", settingService.getStreamParkAddress().concat("/metrics/report"));
- flameGraph.put("token", Utils.uuid());
- flameGraph.put("sampleInterval", 1000 * 60 * 2);
- flameGraph.put("metricInterval", 1000 * 60 * 2);
- return flameGraph;
- }
}
diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 2d54fe758..5cf0f13f8 100644
--- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -409,7 +409,6 @@ create table if not exists `t_flink_cluster` (
`dynamic_properties` text comment 'allows specifying multiple generic configuration options',
`k8s_rest_exposed_type` tinyint default 2 comment 'k8s export(0:loadbalancer,1:clusterip,2:nodeport)',
`k8s_hadoop_integration` tinyint default 0,
- `flame_graph` tinyint default 0 comment 'flameGraph enable,default disable',
`k8s_conf` varchar(255) default null comment 'the path where the k 8 s configuration file is located',
`resolve_order` tinyint default null,
`exception` text comment 'exception information',
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index d68bb17e6..0e3795da7 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -32,6 +32,7 @@
<result column="app_id" jdbcType="VARCHAR" property="appId"/>
<result column="version_id" jdbcType="BIGINT" property="versionId"/>
<result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
+ <result column="flink_cluster_id" jdbcType="BIGINT" property="flinkClusterId"/>
<result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
<result column="k8s_namespace" jdbcType="VARCHAR" property="k8sNamespace"/>
<result column="app_type" jdbcType="INTEGER" property="appType"/>
@@ -98,6 +99,21 @@
limit 1
</select>
+ <select id="existsJobByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.Long">
+ select count(1)
+ from t_flink_app
+ where flink_cluster_id = #{clusterId}
+ limit 1
+ </select>
+
+ <select id="existsRunningJobByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.Long">
+ select count(1)
+ from t_flink_app
+ where flink_cluster_id = #{clusterId}
+ and state = 5
+ limit 1
+ </select>
+
<select id="getByProjectId" resultType="org.apache.streampark.console.core.entity.Application" parameterType="java.lang.Long">
select * from t_flink_app where project_id=#{projectId}
</select>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 1e1a85c9d..c459c4b72 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -36,7 +36,6 @@
<result column="dynamic_properties" jdbcType="LONGVARCHAR" property="dynamicProperties"/>
<result column="k8s_rest_exposed_type" jdbcType="TINYINT" property="k8sRestExposedType"/>
<result column="k8s_hadoop_integration" jdbcType="BOOLEAN" property="k8sHadoopIntegration"/>
- <result column="flame_graph" jdbcType="BOOLEAN" property="flameGraph"/>
<result column="k8s_conf" jdbcType="VARCHAR" property="k8sConf"/>
<result column="resolve_order" jdbcType="INTEGER" property="resolveOrder"/>
<result column="exception" jdbcType="LONGVARCHAR" property="exception"/>
@@ -47,7 +46,12 @@
<select id="existsByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.String">
select count(1)
from t_flink_cluster
- where cluster_id=#{clusterId}
+ <where>
+ cluster_id=#{clusterId}
+ <if test="id != null">
+ and id <> #{id}
+ </if>
+ </where>
limit 1
</select>
diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index b6abd4089..fb5f1777b 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -126,7 +126,6 @@ export interface AppListRecord {
createTimeTo?: any;
backUpDescription?: any;
yarnQueue?: any;
- yarnSessionClusterId?: any;
teamIdList?: any;
teamName: string;
flinkRestUrl?: any;
@@ -178,5 +177,4 @@ export interface CreateParams {
clusterId: string;
flinkClusterId: string;
flinkImage?: any;
- yarnSessionClusterId?: any;
}
diff --git a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index b1949498c..2fb05ea34 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -130,8 +130,10 @@ export enum ClusterStateEnum {
CREATED = 0,
/** cluster started */
STARTED = 1,
- /** cluster stopped */
- STOPED = 2,
+ /** cluster canceled */
+ CANCELED = 2,
+ /** cluster lost */
+ LOST = 3,
}
export enum AppTypeEnum {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index b68483776..9c2907ce9 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -41,7 +41,6 @@ export default {
podTemplate: 'Kubernetes Pod Template',
flinkCluster: 'Flink Cluster',
yarnQueue: 'Yarn Queue',
- yarnSessionClusterId: 'Yarn Session ClusterId',
mavenPom: 'Maven pom',
uploadJar: 'Upload Jar',
kubernetesNamespace: 'Kubernetes Namespace',
@@ -212,7 +211,6 @@ export default {
tmPlaceholder: 'Please select the resource parameters to set',
yarnQueuePlaceholder: 'Please enter yarn queue',
descriptionPlaceholder: 'Please enter description for this application',
- yarnSessionClusterIdPlaceholder: 'Please Select Yarn Session clusterId',
kubernetesNamespacePlaceholder: 'Please enter kubernetes Namespace, e.g: default',
kubernetesClusterIdPlaceholder: 'Please enter Kubernetes clusterId',
kubernetesClusterIdIsRequiredMessage: 'Kubernetes clusterId is required',
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts
index 8e3ffd32d..6c16c7890 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/setting.ts
@@ -89,7 +89,7 @@ export default {
},
required: {
address: 'cluster address is required',
- clusterId: 'Yarn Session ClusterId is required',
+ clusterId: 'Yarn Session Cluster is required',
},
},
env: {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index e86789515..ed01275be 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -41,7 +41,6 @@ export default {
podTemplate: 'Kubernetes Pod 模板',
flinkCluster: 'Flink集群',
yarnQueue: 'Yarn队列',
- yarnSessionClusterId: 'Yarn session模式集群',
mavenPom: 'maven pom',
uploadJar: '上传依赖Jar文件',
kubernetesNamespace: 'K8S命名空间',
@@ -207,7 +206,6 @@ export default {
tmPlaceholder: '请选择要设置的资源参数',
yarnQueuePlaceholder: '请输入yarn队列名称',
descriptionPlaceholder: '请输入此应用程序的描述',
- yarnSessionClusterIdPlaceholder: '请选择 Yarn Session 集群',
kubernetesNamespacePlaceholder: '请输入K8S命名空间, 如: default',
kubernetesClusterIdPlaceholder: '请选择K8S ClusterId',
kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId必填',
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts
index b4107a5d1..800027de2 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/setting.ts
@@ -88,7 +88,7 @@ export default {
},
required: {
address: '必须填写集群地址',
- clusterId: 'Yarn Session ClusterId 为必填项',
+ clusterId: 'Yarn Session Cluster 为必填项',
},
},
env: {
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 9979c3220..0682c30c8 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -156,19 +156,10 @@
unref(flinkClusters).filter((c) => {
if (values.flinkClusterId) {
return c.id == values.flinkClusterId && c.clusterState === ClusterStateEnum.STARTED;
- } else {
- return (
- c.clusterId == values.yarnSessionClusterId &&
- c.clusterState === ClusterStateEnum.STARTED
- );
}
})[0] || null;
if (cluster) {
- Object.assign(values, {
- clusterId: cluster.id,
- flinkClusterId: cluster.id,
- yarnSessionClusterId: cluster.clusterId,
- });
+ Object.assign(values, { flinkClusterId: cluster.id });
}
}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 33729ed3f..53925d714 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -104,7 +104,6 @@
flinkImage: app.flinkImage,
k8sNamespace: app.k8sNamespace,
alertId: selectAlertId,
- yarnSessionClusterId: app.yarnSessionClusterId,
projectName: app.projectName,
module: app.module,
...resetParams,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 21571fb7b..520167d53 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -48,7 +48,7 @@
import { useGo } from '/@/hooks/web/usePage';
import ProgramArgs from './components/ProgramArgs.vue';
import VariableReview from './components/VariableReview.vue';
- import { ClusterStateEnum, JobTypeEnum, UseStrategyEnum } from '/@/enums/flinkEnum';
+ import { JobTypeEnum, UseStrategyEnum } from '/@/enums/flinkEnum';
const route = useRoute();
const go = useGo();
@@ -77,7 +77,6 @@
alerts,
flinkEnvs,
flinkSql,
- flinkClusters,
getEditStreamParkFormSchema,
registerDifferentDrawer,
suggestions,
@@ -124,7 +123,6 @@
flinkClusterId: app.flinkClusterId,
flinkImage: app.flinkImage,
k8sNamespace: app.k8sNamespace,
- yarnSessionClusterId: app.yarnSessionClusterId,
...resetParams,
};
console.log('resetParams', resetParams);
@@ -195,17 +193,6 @@
jar: unref(uploadJars),
});
}
- if (values.yarnSessionClusterId) {
- const cluster =
- flinkClusters.value.filter(
- (c) =>
- c.clusterId === values.yarnSessionClusterId &&
- c.clusterState === ClusterStateEnum.STARTED,
- )[0] || null;
- values.clusterId = cluster.id;
- values.flinkClusterId = cluster.id;
- values.yarnSessionClusterId = cluster.clusterId;
- }
let config = values.configOverride;
if (config != null && config.trim() !== '') {
config = encryptByBase64(config);
@@ -245,18 +232,6 @@
} else {
config = null;
}
- if (values.yarnSessionClusterId) {
- const cluster =
- flinkClusters.value.filter((c) => {
- return (
- c.clusterId === values.yarnSessionClusterId &&
- c.clusterState === ClusterStateEnum.STARTED
- );
- })[0] || null;
- values.clusterId = cluster.id;
- values.flinkClusterId = cluster.id;
- values.yarnSessionClusterId = cluster.clusterId;
- }
const configId = values.strategy == UseStrategyEnum.USE_EXIST ? app.configId : null;
const params = {
id: app.id,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 96fa616d5..b1610042a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -221,14 +221,12 @@ export const useCreateAndEditSchema = (
rules: [{ required: true, message: 'Flink Cluster is required' }],
},
{
- field: 'yarnSessionClusterId',
- label: t('flink.app.yarnSessionClusterId'),
+ field: 'flinkClusterId',
+ label: t('flink.app.flinkCluster'),
component: 'Select',
- componentProps: () => {
- return {
- placeholder: t('flink.app.addAppTips.yarnSessionClusterIdPlaceholder'),
- options: getExecutionCluster(ExecModeEnum.YARN_SESSION, 'clusterId'),
- };
+ componentProps: {
+ placeholder: t('flink.app.flinkCluster'),
+ options: getExecutionCluster(ExecModeEnum.YARN_SESSION, 'id'),
},
ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
rules: [{ required: true, message: 'Flink Cluster is required' }],
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts
index 920e166e8..76f81eec9 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkSchema.ts
@@ -74,25 +74,25 @@ export const useFlinkSchema = (editModel?: string) => {
field: 'flinkClusterId',
label: t('flink.app.flinkCluster'),
component: 'Select',
- componentProps: {
- placeholder: t('flink.app.flinkCluster'),
- options: (getExecutionCluster(ExecModeEnum.REMOTE) || []).map((i) => ({
- label: i.clusterName,
- value: i.id,
- })),
+ componentProps: () => {
+ const options = getExecutionCluster(ExecModeEnum.REMOTE);
+ return {
+ placeholder: t('flink.app.flinkCluster'),
+ options: options.map((i) => ({ label: i.clusterName, value: i.id })),
+ };
},
ifShow: ({ values }) => values.executionMode == ExecModeEnum.REMOTE,
rules: [{ required: true, message: 'Flink Cluster is required' }],
},
{
- field: 'yarnSessionClusterId',
- label: t('flink.app.yarnSessionClusterId'),
+ field: 'flinkClusterId',
+ label: t('flink.app.flinkCluster'),
component: 'Select',
componentProps: () => {
const options = getExecutionCluster(ExecModeEnum.YARN_SESSION);
return {
- placeholder: t('flink.app.addAppTips.yarnSessionClusterIdPlaceholder'),
- options: options.map((i) => ({ label: i.clusterName, value: i.clusterId })),
+ placeholder: t('flink.app.flinkCluster'),
+ options: options.map((i) => ({ label: i.clusterName, value: i.id })),
};
},
ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 21d5d1944..25097521a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -271,7 +271,6 @@ export function handleSubmitParams(
clusterId: values.clusterId || null,
flinkClusterId: values.flinkClusterId || null,
flinkImage: values.flinkImage || null,
- yarnSessionClusterId: values.yarnSessionClusterId || null,
});
if (params.executionMode == ExecModeEnum.KUBERNETES_APPLICATION) {
Object.assign(params, {
@@ -291,7 +290,3 @@ export const filterOption = (input: string, options: Recordable) => {
export function isK8sExecMode(mode: number): boolean {
return [ExecModeEnum.KUBERNETES_SESSION, ExecModeEnum.KUBERNETES_APPLICATION].includes(mode);
}
-// session mode
-export function isSessionMode(mode: number): boolean {
- return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode);
-}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
index 0ee7c4afb..3a001023e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
@@ -50,9 +50,10 @@
const params = handleSubmitParams(values);
if (Object.keys(params).length > 0) {
const res = await fetchCheckCluster(params);
- if (res === 'success') {
+ const status = parseInt(res.status);
+ if (status === 0) {
const resp = await fetchCreateCluster(params);
- if (resp.status) {
+ if (resp) {
Swal.fire({
icon: 'success',
title: values.clusterName.concat(' create successful!'),
@@ -61,20 +62,10 @@
});
go('/flink/setting?activeKey=cluster');
} else {
- Swal.fire(resp.msg);
+ Swal.fire('Failed', 'create cluster failed, please check log', 'error');
}
- } else if (res === 'exists') {
- Swal.fire(
- 'Failed',
- 'the cluster name: ' + values.clusterName + ' is already exists,please check',
- 'error',
- );
} else {
- Swal.fire(
- 'Failed',
- 'the address is invalid or connection failure, please check',
- 'error',
- );
+ Swal.fire('Failed', res.msg, 'error');
}
}
} catch (error) {
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
index 0b58d3842..fc1c11676 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
@@ -65,31 +65,18 @@
id: cluster.id,
});
const res = await fetchCheckCluster(params);
- if (res === 'success') {
- const resp = await fetchUpdateCluster(params);
- if (resp.status) {
- Swal.fire({
- icon: 'success',
- title: values.clusterName.concat(' update successful!'),
- showConfirmButton: false,
- timer: 2000,
- });
- go('/flink/setting?activeKey=cluster');
- } else {
- Swal.fire(resp.data.msg);
- }
- } else if (res === 'exists') {
- Swal.fire(
- 'Failed',
- 'the cluster name: ' + values.clusterName + ' is already exists,please check',
- 'error',
- );
+ const status = parseInt(res.status);
+ if (status === 0) {
+ fetchUpdateCluster(params);
+ Swal.fire({
+ icon: 'success',
+ title: values.clusterName.concat(' update successful!'),
+ showConfirmButton: false,
+ timer: 2000,
+ });
+ go('/flink/setting?activeKey=cluster');
} else {
- Swal.fire(
- 'Failed',
- 'the address is invalid or connection failure, please check',
- 'error',
- );
+ Swal.fire('Failed', res.msg, 'error');
}
}
} catch (error) {
@@ -122,7 +109,6 @@
flinkImage: cluster.flinkImage,
serviceAccount: cluster.serviceAccount,
k8sConf: cluster.k8sConf,
- flameGraph: cluster.flameGraph,
k8sNamespace: cluster.k8sNamespace,
...resetParams,
});
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue
index c9ee6b867..0d290255a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/components/FlinkClusterSetting.vue
@@ -16,7 +16,6 @@
-->
<script lang="ts">
import { defineComponent } from 'vue';
- import { exceptionPropWidth } from '/@/utils';
import { ClusterStateEnum, ExecModeEnum } from '/@/enums/flinkEnum';
export default defineComponent({
name: 'FlinkClusterSetting',
@@ -52,73 +51,28 @@
const { t } = useI18n();
const { Swal, createMessage } = useMessage();
const clusters = ref<FlinkCluster[]>([]);
- const optionClusters = {
- starting: new Map(),
- created: new Map(),
- stoped: new Map(),
- };
function isSessionMode(mode: number): boolean {
return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode);
}
- /* Get flink environmental data*/
- async function getFlinkClusterSetting() {
- const clusterList = await fetchFlinkCluster();
- clusters.value = clusterList;
- for (const key in clusterList) {
- const cluster = clusterList[key];
- if (cluster.clusterState === ClusterStateEnum.CREATED) {
- optionClusters.created.set(cluster.id, new Date().getTime());
- } else if (cluster.clusterState === ClusterStateEnum.STARTED) {
- optionClusters.starting.set(cluster.id, new Date().getTime());
- } else {
- optionClusters.stoped.set(cluster.id, new Date().getTime());
- }
- }
- }
-
function handleIsStart(item) {
- /**
- The cluster was just created but not started
- CREATED(0),
- cluster started
- STARTED(1),
- cluster stopped
- STOPED(2);
- */
- return optionClusters.starting.get(item.id);
+ return item.clusterState === ClusterStateEnum.STARTED;
}
/* Go to edit cluster */
function handleEditCluster(item: FlinkCluster) {
go(`/flink/setting/edit_cluster?clusterId=${item.id}`);
}
/* deploy */
- async function handleDeployCluser(item: FlinkCluster) {
+ async function handleDeployCluster(item: FlinkCluster) {
const hide = createMessage.loading('The current cluster is starting', 0);
try {
- const { data } = await fetchClusterStart(item.id);
- if (data?.data?.status) {
- // optionClusters.starting.set(item.id, new Date().getTime());
- handleMapUpdate('starting');
- getFlinkClusterSetting();
- Swal.fire({
- icon: 'success',
- title: 'The current cluster is started',
- showConfirmButton: false,
- timer: 2000,
- });
- } else {
- Swal.fire({
- title: 'Failed',
- icon: 'error',
- width: exceptionPropWidth(),
- html: '<pre class="propsException">' + data?.data?.msg + '</pre>',
- showCancelButton: true,
- confirmButtonColor: '#55BDDDFF',
- confirmButtonText: 'OK',
- cancelButtonText: 'Close',
- });
- }
+ await fetchClusterStart(item.id);
+ await Swal.fire({
+ icon: 'success',
+ title: 'The current cluster is started',
+ showConfirmButton: false,
+ timer: 2000,
+ });
} catch (error) {
console.error(error);
} finally {
@@ -127,36 +81,15 @@
}
/* delete */
async function handleDelete(item: FlinkCluster) {
- const { data } = await fetchClusterRemove(item.id);
- if (data?.data?.status) {
- // optionClusters.starting.delete(item.id);
- handleMapUpdate('starting');
- getFlinkClusterSetting();
- createMessage.success('The current cluster is remove');
- }
+ await fetchClusterRemove(item.id);
+ createMessage.success('The current cluster is remove');
}
/* shutdown */
async function handleShutdownCluster(item: FlinkCluster) {
const hide = createMessage.loading('The current cluster is canceling', 0);
try {
- const { data } = await fetchClusterShutdown(item.id);
- if (data?.data?.status) {
- // optionClusters.starting.delete(item.id);
- handleMapUpdate('starting');
- getFlinkClusterSetting();
- createMessage.success('The current cluster is shutdown');
- } else {
- Swal.fire({
- title: 'Failed',
- icon: 'error',
- width: exceptionPropWidth(),
- html: '<pre class="propsException">' + data.data.msg + '</pre>',
- showCancelButton: true,
- confirmButtonColor: '#55BDDDFF',
- confirmButtonText: 'OK',
- cancelButtonText: 'Close',
- });
- }
+ await fetchClusterShutdown(item.id);
+ createMessage.success('The current cluster is shutdown');
} catch (error) {
console.error(error);
} finally {
@@ -164,13 +97,14 @@
}
}
- function handleMapUpdate(type: string) {
- const map = optionClusters[type];
- optionClusters[type] = new Map(map);
+ async function getFlinkCluster() {
+ const clusterList = await fetchFlinkCluster();
+ clusters.value = clusterList;
}
onMounted(() => {
- getFlinkClusterSetting();
+ getFlinkCluster();
+ setInterval(() => getFlinkCluster(), 1000 * 3);
});
</script>
<template>
@@ -218,19 +152,8 @@
<template #actions>
<Tooltip :title="t('flink.setting.cluster.edit')">
<a-button
- v-if="handleIsStart(item) && item.executionMode == ExecModeEnum.YARN_SESSION"
- v-auth="'cluster:update'"
- :disabled="true"
- @click="handleEditCluster(item)"
- shape="circle"
- size="large"
- class="control-button"
- >
- <EditOutlined />
- </a-button>
- <a-button
- v-if="!handleIsStart(item) || item.executionMode == ExecModeEnum.REMOTE"
v-auth="'cluster:update'"
+ :disabled="handleIsStart(item)"
@click="handleEditCluster(item)"
shape="circle"
size="large"
@@ -239,40 +162,10 @@
<EditOutlined />
</a-button>
</Tooltip>
- <template v-if="!handleIsStart(item)">
- <Tooltip :title="t('flink.setting.cluster.start')">
- <a-button
- v-if="isSessionMode(item.executionMode)"
- v-auth="'cluster:create'"
- @click="handleDeployCluser(item)"
- shape="circle"
- size="large"
- class="control-button"
- >
- <PlayCircleOutlined />
- </a-button>
- <a-button
- v-else
- :disabled="true"
- v-auth="'cluster:create'"
- shape="circle"
- size="large"
- style="margin-left: 3px"
- class="control-button"
- >
- <PlayCircleOutlined />
- </a-button>
- </Tooltip>
- </template>
-
- <template v-else>
+ <template v-if="handleIsStart(item)">
<Tooltip :title="t('flink.setting.cluster.stop')">
<a-button
- v-if="
- [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(
- item.executionMode,
- )
- "
+ :disabled="item.executionMode === ExecModeEnum.REMOTE"
v-auth="'cluster:create'"
@click="handleShutdownCluster(item)"
shape="circle"
@@ -282,38 +175,31 @@
>
<PauseCircleOutlined />
</a-button>
+ </Tooltip>
+ </template>
+ <template v-else>
+ <Tooltip :title="t('flink.setting.cluster.start')">
<a-button
- v-else
- :disabled="true"
+ :disabled="!isSessionMode(item.executionMode)"
v-auth="'cluster:create'"
+ @click="handleDeployCluster(item)"
shape="circle"
size="large"
class="control-button"
>
- <PauseCircleOutlined />
+ <PlayCircleOutlined />
</a-button>
</Tooltip>
</template>
-
<Tooltip :title="t('flink.setting.cluster.detail')">
<a-button
- v-if="!handleIsStart(item)"
+ :disabled="!handleIsStart(item)"
v-auth="'app:detail'"
- :disabled="true"
shape="circle"
- size="large"
- class="control-button"
- >
- <EyeOutlined />
- </a-button>
- <a-button
- v-else
- v-auth="'app:detail'"
- shape="circle"
- size="large"
- class="control-button"
:href="item.address"
target="_blank"
+ size="large"
+ class="control-button"
>
<EyeOutlined />
</a-button>
@@ -325,7 +211,13 @@
:ok-text="t('common.yes')"
@confirm="handleDelete(item)"
>
- <a-button type="danger" shape="circle" size="large" class="control-button">
+ <a-button
+ :disabled="item.clusterState === ClusterStateEnum.STARTED"
+ type="danger"
+ shape="circle"
+ size="large"
+ class="control-button"
+ >
<DeleteOutlined />
</a-button>
</Popconfirm>
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
index 77de7dca3..0cd2a5ab3 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
@@ -34,7 +34,7 @@ import {
fetchK8sNamespaces,
fetchSessionClusterIds,
} from '/@/api/flink/app/flinkHistory';
-import { handleFormValue, isSessionMode } from '../../app/utils';
+import { handleFormValue } from '../../app/utils';
import { useMessage } from '/@/hooks/web/useMessage';
import { ClusterAddTypeEnum } from '/@/enums/appEnum';
import { useI18n } from '/@/hooks/web/useI18n';
@@ -89,6 +89,22 @@ export const useClusterSetting = () => {
}
}
}
+
+ function isAddExistYarnSession(value: Recordable) {
+ return (
+ value.executionMode == ExecModeEnum.YARN_SESSION &&
+ value.addType == ClusterAddTypeEnum.ADD_EXISTING
+ );
+ }
+
+ // session mode
+ function isShowInSessionMode(value: Recordable): boolean {
+ if (value.executionMode == ExecModeEnum.YARN_SESSION) {
+ return value.addType == ClusterAddTypeEnum.ADD_NEW;
+ }
+ return value.executionMode == ExecModeEnum.KUBERNETES_SESSION;
+ }
+
const getClusterSchema = computed((): FormSchema[] => {
return [
{
@@ -127,18 +143,10 @@ export const useClusterSetting = () => {
},
rules: [{ required: true, message: 'Flink Version is required' }],
},
- {
- field: 'yarnQueue',
- label: 'Yarn Queue',
- component: 'Input',
- componentProps: {
- placeholder: 'Please enter yarn queue',
- },
- ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
- },
{
field: 'addType',
label: t('flink.setting.cluster.form.addType'),
+ ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION,
component: 'Select',
defaultValue: ClusterAddTypeEnum.ADD_EXISTING,
componentProps: {
@@ -156,6 +164,17 @@ export const useClusterSetting = () => {
],
},
},
+ {
+ field: 'yarnQueue',
+ label: 'Yarn Queue',
+ component: 'Input',
+ componentProps: {
+ placeholder: 'Please enter yarn queue',
+ },
+ ifShow: ({ values }) =>
+ values.executionMode == ExecModeEnum.YARN_SESSION &&
+ values.addType == ClusterAddTypeEnum.ADD_NEW,
+ },
{
field: 'address',
label: 'Address',
@@ -168,19 +187,18 @@ export const useClusterSetting = () => {
: 'Please enter cluster address, e.g: http://host:port',
};
},
- ifShow: ({ values }) => values.addType == ClusterAddTypeEnum.ADD_EXISTING,
+ ifShow: ({ values }) =>
+ isAddExistYarnSession(values) || values.executionMode == ExecModeEnum.REMOTE,
rules: [{ required: true, message: t('flink.setting.cluster.required.address') }],
},
{
field: 'clusterId',
- label: 'Yarn Session ClusterId',
+ label: 'Yarn Session Cluster',
component: 'Input',
componentProps: {
- placeholder: 'Please enter Yarn Session clusterId',
+ placeholder: 'Please enter Yarn Session cluster',
},
- ifShow: ({ values }) =>
- values.addType == ClusterAddTypeEnum.ADD_EXISTING &&
- values.executionMode == ExecModeEnum.YARN_SESSION,
+ ifShow: ({ values }) => isAddExistYarnSession(values),
rules: [{ required: true, message: t('flink.setting.cluster.required.clusterId') }],
},
{
@@ -254,15 +272,15 @@ export const useClusterSetting = () => {
{
field: 'resolveOrder',
label: 'Resolve Order',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
componentProps: { placeholder: 'classloader.resolve-order', options: resolveOrder },
- rules: [{ required: true, message: 'Resolve Order is required', type: 'number' }],
+ rules: [{ message: 'Resolve Order is required', type: 'number' }],
},
{
field: 'slot',
label: 'Task Slots',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'InputNumber',
componentProps: {
placeholder: 'Number of slots per TaskManager',
@@ -274,14 +292,14 @@ export const useClusterSetting = () => {
{
field: 'totalOptions',
label: 'Total Memory Options',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
render: (renderCallbackParams) => renderTotalMemory(renderCallbackParams),
},
{
field: 'totalItem',
label: 'totalItem',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
renderColContent: ({ model, field }) =>
renderOptionsItems(model, 'totalOptions', field, '.memory', true),
@@ -289,7 +307,7 @@ export const useClusterSetting = () => {
{
field: 'jmOptions',
label: 'JM Memory Options',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
componentProps: {
showSearch: true,
@@ -304,7 +322,7 @@ export const useClusterSetting = () => {
{
field: 'jmOptionsItem',
label: 'jmOptionsItem',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
renderColContent: ({ model, field }) =>
renderOptionsItems(model, 'jmOptions', field, 'jobmanager.memory.'),
@@ -312,7 +330,7 @@ export const useClusterSetting = () => {
{
field: 'tmOptions',
label: 'TM Memory Options',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
componentProps: {
showSearch: true,
@@ -327,7 +345,7 @@ export const useClusterSetting = () => {
{
field: 'tmOptionsItem',
label: 'tmOptionsItem',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Select',
renderColContent: ({ model, field }) =>
renderOptionsItems(model, 'tmOptions', field, 'taskmanager.memory.'),
@@ -335,7 +353,7 @@ export const useClusterSetting = () => {
{
field: 'dynamicProperties',
label: 'Dynamic Properties',
- ifShow: ({ values }) => isSessionMode(values.executionMode),
+ ifShow: ({ values }) => isShowInSessionMode(values),
component: 'Input',
render: (renderCallbackParams) => renderDynamicProperties(renderCallbackParams),
},
@@ -353,7 +371,6 @@ export const useClusterSetting = () => {
function handleSubmitParams(values: Recordable) {
const options = handleFormValue(values);
const params = {
- clusterId: values.clusterId || null,
clusterName: values.clusterName,
executionMode: values.executionMode,
versionId: values.versionId,
@@ -366,17 +383,23 @@ export const useClusterSetting = () => {
});
return params;
case ExecModeEnum.YARN_SESSION:
- Object.assign(params, {
- options: JSON.stringify(options),
- yarnQueue: values.yarnQueue || 'default',
- dynamicProperties: values.dynamicProperties,
- resolveOrder: values.resolveOrder,
- address: values.address,
- flameGraph: values.flameGraph,
- });
+ if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) {
+ Object.assign(params, {
+ clusterId: values.clusterId,
+ address: values.address,
+ });
+ } else {
+ Object.assign(params, {
+ options: JSON.stringify(options),
+ yarnQueue: values.yarnQueue || 'default',
+ dynamicProperties: values.dynamicProperties,
+ resolveOrder: values.resolveOrder,
+ });
+ }
return params;
case ExecModeEnum.KUBERNETES_SESSION:
Object.assign(params, {
+ clusterId: values.clusterId,
options: JSON.stringify(options),
dynamicProperties: values.dynamicProperties,
resolveOrder: values.resolveOrder,
@@ -386,7 +409,6 @@ export const useClusterSetting = () => {
k8sConf: values.k8sConf,
flinkImage: values.flinkImage || null,
address: values.address,
- flameGraph: values.flameGraph,
});
return params;
default:
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
index 278d46758..264653d49 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
case class DeployRequest(flinkVersion: FlinkVersion,
clusterId: String,
executionMode: ExecutionMode,
- flameGraph: JavaMap[String, java.io.Serializable],
properties: JavaMap[String, Any],
@Nullable k8sDeployParam: KubernetesDeployParam) {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
index f352732da..800de3059 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
@@ -17,6 +17,4 @@
package org.apache.streampark.flink.submit.bean
-case class DeployResponse(address: String,
- clusterId: String,
- message: String = null)
+case class DeployResponse(address: String, clusterId: String)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index cc701f310..84cc59363 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -88,5 +88,4 @@ object KubernetesNativeApplicationSubmit extends KubernetesNativeSubmitTrait {
super.doCancel(cancelRequest, flinkConfig)
}
-
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
index 9e91914ff..9872d082a 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
@@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
| exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType}
| serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
| flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
- | flameGraph : ${deployRequest.flameGraph != null}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
@@ -158,7 +157,7 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
client = clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
if (client.getWebInterfaceURL != null) {
- DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+ DeployResponse(client.getWebInterfaceURL, client.getClusterId)
} else {
null
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala
index 5022e6ec1..7cf8a7d43 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/LocalSubmit.scala
@@ -50,8 +50,7 @@ object LocalSubmit extends FlinkSubmitTrait {
val jobGraph = packageProgramJobGraph._2
client = createLocalCluster(flinkConfig)
val jobId = client.submitJob(jobGraph).get().toString
- val result = SubmitResponse(jobId, flinkConfig.toMap, jobId)
- result
+ SubmitResponse(jobId, flinkConfig.toMap, jobId)
} catch {
case e: Exception =>
logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
index 7c87a4ae7..e8a84264f 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
@@ -30,7 +30,6 @@ import java.io.File
import java.lang.{Integer => JavaInt}
import scala.util.{Failure, Success, Try}
-
/**
* Submit Job to Remote Cluster
*/
@@ -66,7 +65,7 @@ object RemoteSubmit extends FlinkSubmitTrait {
try {
client = standAloneDescriptor._2.retrieve(standAloneDescriptor._1).getClusterClient
val jobID = JobID.fromHexString(cancelRequest.jobId)
- val actionResult = cancelJob(cancelRequest, jobID, client)
+ val actionResult = super.cancelJob(cancelRequest, jobID, client)
CancelResponse(actionResult)
} catch {
case e: Exception =>
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 21b216e77..3fa415c46 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -17,7 +17,6 @@
package org.apache.streampark.flink.submit.impl
-import org.apache.streampark.common.conf.ConfigConst.KEY_YARN_APP_ID
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.submit.`trait`.YarnSubmitTrait
import org.apache.streampark.flink.submit.bean._
@@ -49,7 +48,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
flinkConfig
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
-
logInfo(
s"""
|------------------------------------------------------------------
@@ -137,8 +135,9 @@ object YarnSessionSubmit extends YarnSubmitTrait {
}
override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
- flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.properties.get(KEY_YARN_APP_ID).toString)
- flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+ flinkConfig
+ .safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.clusterId)
+ .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
logInfo(
s"""
|------------------------------------------------------------------
@@ -153,7 +152,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
clusterDescriptor = yarnClusterDescriptor._2
client = clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
val jobID = JobID.fromHexString(cancelRequest.jobId)
- val actionResult = cancelJob(cancelRequest, jobID, client)
+ val actionResult = super.cancelJob(cancelRequest, jobID, client)
CancelResponse(actionResult)
} catch {
case e: Exception => logError(s"stop flink yarn session job fail")
@@ -172,7 +171,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
- | flameGraph : ${deployRequest.flameGraph != null}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
@@ -230,7 +228,8 @@ object YarnSessionSubmit extends YarnSubmitTrait {
clusterDescriptor = yarnClusterDescriptor._2
if (FinalApplicationStatus.UNDEFINED.equals(clusterDescriptor.getYarnClient.getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId)).getFinalApplicationStatus)) {
val clientProvider = clusterDescriptor.retrieve(yarnClusterDescriptor._1)
- clientProvider.getClusterClient.shutDownCluster()
+ client = clientProvider.getClusterClient
+ client.shutDownCluster()
}
logInfo(s"the ${shutDownRequest.clusterId}'s final status is ${clusterDescriptor.getYarnClient.getApplicationReport(ConverterUtils.toApplicationId(shutDownRequest.clusterId)).getFinalApplicationStatus}")
ShutDownResponse()
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala
index 7e7d85e82..a9c3a57f4 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/KubernetesNativeSubmitTrait.scala
@@ -95,10 +95,9 @@ trait KubernetesNativeSubmitTrait extends FlinkSubmitTrait {
clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
client = clusterDescriptor.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)).getClusterClient
val jobID = JobID.fromHexString(cancelRequest.jobId)
- val actionResult = cancelJob(cancelRequest, jobID, client)
+ val actionResult = super.cancelJob(cancelRequest, jobID, client)
IngressController.deleteIngress(cancelRequest.clusterId, cancelRequest.kubernetesNamespace)
CancelResponse(actionResult)
-
} catch {
case e: Exception =>
logger.error(s"[flink-submit] stop flink job failed, mode=${flinkConfig.get(DeploymentOptions.TARGET)}, cancelRequest=${cancelRequest}")
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala
index 1735f6bf9..c667b15fe 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/YarnSubmitTrait.scala
@@ -37,9 +37,7 @@ import scala.util.Try
trait YarnSubmitTrait extends FlinkSubmitTrait {
override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = {
-
val jobID = getJobID(cancelRequest.jobId)
-
val clusterClient = {
flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.clusterId)
val clusterClientFactory = new YarnClusterClientFactory
@@ -51,7 +49,7 @@ trait YarnSubmitTrait extends FlinkSubmitTrait {
clusterDescriptor.retrieve(applicationId).getClusterClient
}
Try {
- val savepointDir = cancelJob(cancelRequest, jobID, clusterClient)
+ val savepointDir = super.cancelJob(cancelRequest, jobID, clusterClient)
CancelResponse(savepointDir)
}.recover {
case e => throw new FlinkException(s"[StreamPark] Triggering a savepoint for the job ${cancelRequest.jobId} failed. detail: ${ExceptionUtils.stringifyException(e)}");