You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/26 03:31:36 UTC

[incubator-streampark] branch session-cluster updated: [Bug] flink cluster bug fixed

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch session-cluster
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/session-cluster by this push:
     new add72fef7 [Bug] flink cluster bug fixed
add72fef7 is described below

commit add72fef77d888c5463a2cc8eece6a85a7b3edb9
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 26 11:31:13 2022 +0800

    [Bug] flink cluster bug fixed
---
 .../core/controller/FlinkClusterController.java    |  19 ++--
 .../console/core/entity/FlinkCluster.java          |   6 +-
 .../console/core/service/FlinkClusterService.java  |  10 +--
 .../core/service/impl/FlinkClusterServiceImpl.java | 100 +++++++--------------
 .../src/views/flink/setting/AddCluster.vue         |   4 +-
 .../src/views/flink/setting/EditCluster.vue        |  20 ++---
 .../views/flink/setting/hooks/useClusterSetting.ts |   3 +-
 .../flink/submit/bean/DeployResponse.scala         |   4 +-
 8 files changed, 65 insertions(+), 101 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 2482ad13c..11dc038f7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -64,8 +64,8 @@ public class FlinkClusterController {
     @PostMapping("create")
     @RequiresPermissions("cluster:create")
     public RestResponse create(FlinkCluster cluster) {
-        ResponseResult result = flinkClusterService.create(cluster);
-        return RestResponse.success(result);
+        Boolean success = flinkClusterService.create(cluster);
+        return RestResponse.success(success);
     }
 
     @PostMapping("update")
@@ -88,7 +88,8 @@ public class FlinkClusterController {
         flinkCluster.setResolveOrder(cluster.getResolveOrder());
         flinkCluster.setServiceAccount(cluster.getServiceAccount());
         flinkCluster.setDescription(cluster.getDescription());
-        return RestResponse.success(flinkClusterService.update(flinkCluster));
+        flinkClusterService.update(flinkCluster);
+        return RestResponse.success();
     }
 
     @PostMapping("get")
@@ -100,21 +101,21 @@ public class FlinkClusterController {
     @PostMapping("start")
     public RestResponse start(FlinkCluster flinkCluster) {
         FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
-        ResponseResult start = flinkClusterService.start(cluster);
-        return RestResponse.success(start);
+        flinkClusterService.start(cluster);
+        return RestResponse.success();
     }
 
     @PostMapping("shutdown")
     public RestResponse shutdown(FlinkCluster flinkCluster) {
         FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
-        ResponseResult shutdown = flinkClusterService.shutdown(cluster);
-        return RestResponse.success(shutdown);
+        flinkClusterService.shutdown(cluster);
+        return RestResponse.success();
     }
 
     @PostMapping("delete")
     public RestResponse delete(FlinkCluster flinkCluster) {
         FlinkCluster cluster = flinkClusterService.getById(flinkCluster.getId());
-        ResponseResult delete = flinkClusterService.delete(cluster);
-        return RestResponse.success(delete);
+        flinkClusterService.delete(cluster);
+        return RestResponse.success();
     }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 965bfbda4..22c78e239 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -144,7 +144,7 @@ public class FlinkCluster implements Serializable {
         return null;
     }
 
-    public boolean verifyConnection() {
+    public boolean verifyRemoteCluster() {
         if (address == null) {
             return false;
         }
@@ -156,7 +156,9 @@ public class FlinkCluster implements Serializable {
                 return false;
             }
             try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
+                String overviewUrl = url + "/overview";
+                String result = HttpClientUtils.httpGetRequest(overviewUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                JacksonUtils.read(result, Overview.class);
                 return true;
             } catch (Exception ignored) {
                 //
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 2be56cb3e..ecf775994 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -26,15 +26,15 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
 
     ResponseResult check(FlinkCluster flinkCluster);
 
-    ResponseResult create(FlinkCluster flinkCluster);
+    Boolean create(FlinkCluster flinkCluster);
 
-    ResponseResult delete(FlinkCluster flinkCluster);
+    void delete(FlinkCluster flinkCluster);
 
-    ResponseResult update(FlinkCluster flinkCluster);
+    void update(FlinkCluster flinkCluster);
 
-    ResponseResult start(FlinkCluster flinkCluster);
+    void start(FlinkCluster flinkCluster);
 
-    ResponseResult shutdown(FlinkCluster flinkCluster);
+    void shutdown(FlinkCluster flinkCluster);
 
     Boolean existsByClusterId(String clusterId, Long id);
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 8217aee40..d35731509 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -105,7 +106,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
 
         // 3) Check connection
         if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
-            if (!cluster.verifyConnection()) {
+            if (!cluster.verifyRemoteCluster()) {
                 result.setMsg("the remote cluster connection failed, please check!");
                 result.setStatus(3);
                 return result;
@@ -123,8 +124,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     }
 
     @Override
-    public ResponseResult create(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public Boolean create(FlinkCluster flinkCluster) {
         flinkCluster.setUserId(commonService.getUserId());
         flinkCluster.setCreateTime(new Date());
         if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
@@ -138,20 +138,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
         } else {
             flinkCluster.setClusterState(ClusterState.CREATED.getValue());
         }
-        try {
-            save(flinkCluster);
-            result.setStatus(1);
-        } catch (Exception e) {
-            result.setStatus(0);
-            result.setMsg("create cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-        }
-        return result;
+        return save(flinkCluster);
     }
 
     @Override
     @Transactional(rollbackFor = {Exception.class})
-    public ResponseResult start(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public void start(FlinkCluster flinkCluster) {
         LambdaUpdateWrapper<FlinkCluster> updateWrapper = Wrappers.lambdaUpdate();
         updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
         try {
@@ -170,9 +162,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
                         flinkCluster.getK8sRestExposedTypeEnum());
                     break;
                 default:
-                    result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
-                    result.setStatus(0);
-                    return result;
+                    throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
             }
             FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
             DeployRequest deployRequest = new DeployRequest(
@@ -186,49 +176,35 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
             Future<DeployResponse> future = executorService.submit(() -> FlinkSubmitter.deploy(deployRequest));
             DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
             if (null != deployResponse) {
-                if (deployResponse.message() == null) {
-                    updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
-                    updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
-                    updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
-                    updateWrapper.set(FlinkCluster::getException, null);
-                    update(updateWrapper);
-                    result.setStatus(1);
-                    FlinkTrackingTask.removeFlinkCluster(flinkCluster);
-                } else {
-                    result.setStatus(0);
-                    result.setMsg("deploy cluster failed," + deployResponse.message());
-                }
+                updateWrapper.set(FlinkCluster::getAddress, deployResponse.address());
+                updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId());
+                updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue());
+                updateWrapper.set(FlinkCluster::getException, null);
+                update(updateWrapper);
+                FlinkTrackingTask.removeFlinkCluster(flinkCluster);
             } else {
-                result.setStatus(0);
-                result.setMsg("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
+                throw new ApiAlertException("deploy cluster failed, unknown reason,please check you params or StreamPark error log");
             }
-            return result;
         } catch (Exception e) {
             log.error(e.getMessage(), e);
             updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
             updateWrapper.set(FlinkCluster::getException, e.toString());
             update(updateWrapper);
-            result.setStatus(0);
             throw new ApiDetailException(e);
         }
     }
 
     @Override
-    public ResponseResult update(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public void update(FlinkCluster flinkCluster) {
         try {
             updateById(flinkCluster);
-            result.setStatus(1);
         } catch (Exception e) {
-            result.setStatus(0);
-            result.setMsg("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
+            throw new ApiDetailException("update cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
         }
-        return result;
     }
 
     @Override
-    public ResponseResult shutdown(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public void shutdown(FlinkCluster flinkCluster) {
         ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
         String clusterId = flinkCluster.getClusterId();
         KubernetesDeployParam kubernetesDeployParam = null;
@@ -245,14 +221,10 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
                     flinkCluster.getK8sRestExposedTypeEnum());
                 break;
             default:
-                result.setMsg("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
-                result.setStatus(0);
-                return result;
+                throw new ApiAlertException("the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!");
         }
         if (StringUtils.isBlank(clusterId)) {
-            result.setMsg("the clusterId is Empty!");
-            result.setStatus(0);
-            return result;
+            throw new ApiAlertException("the clusterId can not be empty!");
         }
         FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
         ShutDownRequest stopRequest = new ShutDownRequest(
@@ -270,19 +242,14 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
             if (null != shutDownResponse) {
                 updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STOPED.getValue());
                 update(updateWrapper);
-                result.setStatus(1);
-                return result;
+            } else {
+                throw new ApiAlertException("clusterId is not exists!");
             }
-            result.setStatus(1);
-            result.setMsg("clusterId is not exists!");
-            return result;
         } catch (Exception e) {
             log.error(e.getMessage(), e);
             updateWrapper.set(FlinkCluster::getException, e.toString());
             update(updateWrapper);
-            result.setStatus(0);
-            result.setMsg("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-            return result;
+            throw new ApiDetailException("shutdown cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
         }
     }
 
@@ -297,24 +264,23 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     }
 
     @Override
-    public ResponseResult delete(FlinkCluster flinkCluster) {
-        ResponseResult result = new ResponseResult();
+    public void delete(FlinkCluster flinkCluster) {
         if (StringUtils.isNoneBlank(flinkCluster.getClusterId())
             && ClusterState.STARTED.equals(flinkCluster.getClusterStateEnum())
             && !ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
-            result = shutdown(flinkCluster);
-            if (0 == result.getStatus()) {
-                return result;
+            try {
+                shutdown(flinkCluster);
+            } catch (Exception e) {
+                if (e instanceof ApiDetailException) {
+                    throw e;
+                } else if (e instanceof ApiAlertException) {
+                    throw new ApiAlertException("shutdown cluster failed: " + e.getMessage());
+                } else {
+                    throw new ApiDetailException("shutdown cluster failed: " + ExceptionUtils.getStackTrace(e));
+                }
             }
         }
-        try {
-            removeById(flinkCluster.getId());
-            result.setStatus(1);
-        } catch (Exception e) {
-            result.setStatus(0);
-            result.setMsg("delete cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e));
-        }
-        return result;
+        removeById(flinkCluster.getId());
     }
 
 }
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
index f67f0cbe7..3a001023e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue
@@ -53,7 +53,7 @@
         const status = parseInt(res.status);
         if (status === 0) {
           const resp = await fetchCreateCluster(params);
-          if (resp.status) {
+          if (resp) {
             Swal.fire({
               icon: 'success',
               title: values.clusterName.concat(' create successful!'),
@@ -62,7 +62,7 @@
             });
             go('/flink/setting?activeKey=cluster');
           } else {
-            Swal.fire(resp.msg);
+            Swal.fire('Failed', 'create cluster failed, please check log', 'error');
           }
         } else {
           Swal.fire('Failed', res.msg, 'error');
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
index 32836455e..fc1c11676 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue
@@ -67,18 +67,14 @@
         const res = await fetchCheckCluster(params);
         const status = parseInt(res.status);
         if (status === 0) {
-          const resp = await fetchUpdateCluster(params);
-          if (resp.status) {
-            Swal.fire({
-              icon: 'success',
-              title: values.clusterName.concat(' update successful!'),
-              showConfirmButton: false,
-              timer: 2000,
-            });
-            go('/flink/setting?activeKey=cluster');
-          } else {
-            Swal.fire(resp.data.msg);
-          }
+          fetchUpdateCluster(params);
+          Swal.fire({
+            icon: 'success',
+            title: values.clusterName.concat(' update successful!'),
+            showConfirmButton: false,
+            timer: 2000,
+          });
+          go('/flink/setting?activeKey=cluster');
         } else {
           Swal.fire('Failed', res.msg, 'error');
         }
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
index 74698ad46..b6d4acbc5 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts
@@ -371,7 +371,6 @@ export const useClusterSetting = () => {
   function handleSubmitParams(values: Recordable) {
     const options = handleFormValue(values);
     const params = {
-      clusterId: values.clusterId || null,
       clusterName: values.clusterName,
       executionMode: values.executionMode,
       versionId: values.versionId,
@@ -386,6 +385,7 @@ export const useClusterSetting = () => {
       case ExecModeEnum.YARN_SESSION:
         if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) {
           Object.assign(params, {
+            clusterId: values.clusterId,
             address: values.address,
           });
         } else {
@@ -399,6 +399,7 @@ export const useClusterSetting = () => {
         return params;
       case ExecModeEnum.KUBERNETES_SESSION:
         Object.assign(params, {
+          clusterId: values.clusterId,
           options: JSON.stringify(options),
           dynamicProperties: values.dynamicProperties,
           resolveOrder: values.resolveOrder,
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
index f352732da..800de3059 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
@@ -17,6 +17,4 @@
 
 package org.apache.streampark.flink.submit.bean
 
-case class DeployResponse(address: String,
-                          clusterId: String,
-                          message: String = null)
+case class DeployResponse(address: String, clusterId: String)