You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/26 03:31:36 UTC
[incubator-streampark] branch session-cluster updated: [Bug] flink cluster bug fixed
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch session-cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/session-cluster by this push:
new add72fef7 [Bug] flink cluster bug fixed
add72fef7 is described below
commit add72fef77d888c5463a2cc8eece6a85a7b3edb9
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 26 11:31:13 2022 +0800
[Bug] flink cluster bug fixed
---
.../core/controller/FlinkClusterController.java | 19 ++--
.../console/core/entity/FlinkCluster.java | 6 +-
.../console/core/service/FlinkClusterService.java | 10 +--
.../core/service/impl/FlinkClusterServiceImpl.java | 100 +++++++--------------
.../src/views/flink/setting/AddCluster.vue | 4 +-
.../src/views/flink/setting/EditCluster.vue | 20 ++---
.../views/flink/setting/hooks/useClusterSetting.ts | 3 +-
.../flink/submit/bean/DeployResponse.scala | 4 +-
8 files changed, 65 insertions(+), 101 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 2482ad13c..11dc038f7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -64,8 +64,8 @@ public class FlinkClusterController {
@PostMapping("create")
@RequiresPermissions("cluster:create")
public RestResponse create(FlinkCluster cluster) {
- ResponseResult result = flinkClusterService.create(cluster);
- return RestResponse.success(result);
+ Boolean success = flinkClusterService.create(cluster);
+ return RestResponse.success(success);
}
@PostMapping("update")
@@ -88,7 +88,8 @@ public class FlinkClusterController {
flinkCluster.setResolveOrder(cluster.getResolveOrder());
flinkCluster.setServiceAccount(cluster.getServiceAccount());
flinkCluster.setDescription(cluster.getDescription());
- return RestResponse.success(flinkClusterService.update(flinkCluster));
+ flinkClusterService.update(flinkCluster);
+ return RestResponse.success();
}
@PostMapping("get")
@@ -100,21 +101,21 @@ public class FlinkClusterController {
@PostMapping("start")
public RestResponse start(FlinkCluster flinkCluster) {
FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
- ResponseResult start = flinkClusterService.start(cluster);
- return RestResponse.success(start);
+ flinkClusterService.start(cluster);
+ return RestResponse.success();
}
@PostMapping("shutdown")
public RestResponse shutdown(FlinkCluster flinkCluster) {
FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
- ResponseResult shutdown = flinkClusterService.shutdown(cluster);
- return RestResponse.success(shutdown);
+ flinkClusterService.shutdown(cluster);
+ return RestResponse.success();
}
@PostMapping("delete")
public RestResponse delete(FlinkCluster flinkCluster) {
FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
- ResponseResult delete = flinkClusterService.delete(cluster);
- return RestResponse.success(delete);
+ flinkClusterService.delete(cluster);
+ return RestResponse.success();
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 965bfbda4..22c78e239 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -144,7 +144,7 @@ public class FlinkCluster implements Serializable {
return null;
}
- public boolean verifyConnection() {
+ public boolean verifyRemoteCluster() {
if (address == null) {
return false;
}
@@ -156,7 +156,9 @@ public class FlinkCluster implements Serializable {
return false;
}
try {
- HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
+ String overviewUrl = url + "/overview";
+ String result = HttpClientUtils.httpGetRequest(overviewUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+ JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception ignored) {
//
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 2be56cb3e..ecf775994 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -26,15 +26,15 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
ResponseResult check(FlinkCluster flinkCluster);
- ResponseResult create(FlinkCluster flinkCluster);
+ Boolean create(FlinkCluster flinkCluster);
- ResponseResult delete(FlinkCluster flinkCluster);
+ void delete(FlinkCluster flinkCluster);
- ResponseResult update(FlinkCluster flinkCluster);
+ void update(FlinkCluster flinkCluster);
- ResponseResult start(FlinkCluster flinkCluster);
+ void start(FlinkCluster flinkCluster);
- ResponseResult shutdown(FlinkCluster flinkCluster);
+ void shutdown(FlinkCluster flinkCluster);
Boolean existsByClusterId(String clusterId, Long id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 8217aee40..d35731509 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -105,7 +106,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
// 3) Check connection
if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
- if (!cluster.verifyConnection()) {
+ if (!cluster.verifyRemoteCluster()) {
result.setMsg("the remote cluster connection failed, please check!");
result.setStatus(3);
return result;
@@ -123,8 +124,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public ResponseResult create(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public Boolean create(FlinkCluster flinkCluster) {
flinkCluster.setUserId(commonService.getUserId());
flinkCluster.setCreateTime(new Date());
if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
@@ -138,20 +138,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
} else {
flinkCluster.setClusterState(ClusterState.CREATED.getValue());
}
- try {
- save(flinkCluster);
- result.setStatus(1);
- } catch (Exception e) {
- result.setStatus(0);
- result.setMsg("create cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
- }
- return result;
+ return save(flinkCluster);
}
@Override
@Transactional(rollbackFor = {Exception.class})
- public ResponseResult start(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public void start(FlinkCluster flinkCluster) {
LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
try {
@@ -170,9 +162,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.getK8sRestExposedTypeEnum());
break;
default:
- result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
- result.setStatus(0);
- return result;
+ throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
}
FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
DeployRequest deployRequest = new DeployRequest(
@@ -186,49 +176,35 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
Future<DeployResponse> future = executorService.submit(() -> FlinkSubmitter.deploy(deployRequest));
DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
if (null != deployResponse) {
- if (deployResponse.message() == null) {
- updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
- updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
- updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
- updateWrapper.set(FlinkCluster::getException, null);
- update(updateWrapper);
- result.setStatus(1);
- FlinkTrackingTask.removeFlinkCluster(flinkCluster);
- } else {
- result.setStatus(0);
- result.setMsg("deploy cluster failed," + deployResponse.message());
- }
+ updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
+ updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
+ updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
+ updateWrapper.set(FlinkCluster::getException, null);
+ update(updateWrapper);
+ FlinkTrackingTask.removeFlinkCluster(flinkCluster);
} else {
- result.setStatus(0);
- result.setMsg("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
+ throw new ApiAlertException("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
}
- return result;
} catch (Exception e) {
log.error(e.getMessage(), e);
updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
updateWrapper.set(FlinkCluster::getException, e.toString());
update(updateWrapper);
- result.setStatus(0);
throw new ApiDetailException(e);
}
}
@Override
- public ResponseResult update(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public void update(FlinkCluster flinkCluster) {
try {
updateById(flinkCluster);
- result.setStatus(1);
} catch (Exception e) {
- result.setStatus(0);
- result.setMsg("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
+ throw new ApiDetailException("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
}
- return result;
}
@Override
- public ResponseResult shutdown(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public void shutdown(FlinkCluster flinkCluster) {
ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
String clusterId = flinkCluster.getClusterId();
KubernetesDeployParam kubernetesDeployParam = null;
@@ -245,14 +221,10 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.getK8sRestExposedTypeEnum());
break;
default:
- result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
- result.setStatus(0);
- return result;
+ throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
}
if (StringUtils.isBlank(clusterId)) {
- result.setMsg("the clusterId is Empty!");
- result.setStatus(0);
- return result;
+ throw new ApiAlertException("the clusterId can not be empty!");
}
FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
ShutDownRequest stopRequest = new ShutDownRequest(
@@ -270,19 +242,14 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
if (null != shutDownResponse) {
updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
update(updateWrapper);
- result.setStatus(1);
- return result;
+ } else {
+ throw new ApiAlertException("clusterId is not exists!");
}
- result.setStatus(1);
- result.setMsg("clusterId is not exists!");
- return result;
} catch (Exception e) {
log.error(e.getMessage(), e);
updateWrapper.set(FlinkCluster::getException, e.toString());
update(updateWrapper);
- result.setStatus(0);
- result.setMsg("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
- return result;
+ throw new ApiDetailException("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
}
}
@@ -297,24 +264,23 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public ResponseResult delete(FlinkCluster flinkCluster) {
- ResponseResult result = new ResponseResult();
+ public void delete(FlinkCluster flinkCluster) {
if (StringUtils.isNoneBlank(flinkCluster.getClusterId())
&& ClusterState.STARTED.equals(flinkCluster.getClusterStateEnum())
&& !ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
- result = shutdown(flinkCluster);
- if (0 == result.getStatus()) {
- return result;
+ try {
+ shutdown(flinkCluster);
+ } catch (Exception e) {
+ if (e instanceof ApiDetailException) {
+ throw e;
+ } else if (e instanceof ApiAlertException) {
+ throw new ApiAlertException("shutdown cluster failed: " + e.getMessage());
+ } else {
+ throw new ApiDetailException("shutdown cluster failed: " + ExceptionUtils.getStackTrace(e));
+ }
}
}
- try {
- removeById(flinkCluster.getId());
- result.setStatus(1);
- } catch (Exception e) {
- result.setStatus(0);
- result.setMsg("delete cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
- }
- return result;
+ removeById(flinkCluster.getId());
}
}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
index f67f0cbe7..3a001023e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
@@ -53,7 +53,7 @@
const status = parseInt(res.status);
if (status === 0) {
const resp = await fetchCreateCluster(params);
- if (resp.status) {
+ if (resp) {
Swal.fire({
icon: 'success',
title: values.clusterName.concat(' create successful!'),
@@ -62,7 +62,7 @@
});
go('/flink/setting?activeKey=cluster');
} else {
- Swal.fire(resp.msg);
+ Swal.fire('Failed', 'create cluster failed, please check log', 'error');
}
} else {
Swal.fire('Failed', res.msg, 'error');
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
index 32836455e..fc1c11676 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
@@ -67,18 +67,14 @@
const res = await fetchCheckCluster(params);
const status = parseInt(res.status);
if (status === 0) {
- const resp = await fetchUpdateCluster(params);
- if (resp.status) {
- Swal.fire({
- icon: 'success',
- title: values.clusterName.concat(' update successful!'),
- showConfirmButton: false,
- timer: 2000,
- });
- go('/flink/setting?activeKey=cluster');
- } else {
- Swal.fire(resp.data.msg);
- }
+ fetchUpdateCluster(params);
+ Swal.fire({
+ icon: 'success',
+ title: values.clusterName.concat(' update successful!'),
+ showConfirmButton: false,
+ timer: 2000,
+ });
+ go('/flink/setting?activeKey=cluster');
} else {
Swal.fire('Failed', res.msg, 'error');
}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
index 74698ad46..b6d4acbc5 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
@@ -371,7 +371,6 @@ export const useClusterSetting = () => {
function handleSubmitParams(values: Recordable) {
const options = handleFormValue(values);
const params = {
- clusterId: values.clusterId || null,
clusterName: values.clusterName,
executionMode: values.executionMode,
versionId: values.versionId,
@@ -386,6 +385,7 @@ export const useClusterSetting = () => {
case ExecModeEnum.YARN_SESSION:
if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) {
Object.assign(params, {
+ clusterId: values.clusterId,
address: values.address,
});
} else {
@@ -399,6 +399,7 @@ export const useClusterSetting = () => {
return params;
case ExecModeEnum.KUBERNETES_SESSION:
Object.assign(params, {
+ clusterId: values.clusterId,
options: JSON.stringify(options),
dynamicProperties: values.dynamicProperties,
resolveOrder: values.resolveOrder,
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
index f352732da..800de3059 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
@@ -17,6 +17,4 @@
package org.apache.streampark.flink.submit.bean
-case class DeployResponse(address: String,
- clusterId: String,
- message: String = null)
+case class DeployResponse(address: String, clusterId: String)