You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2023/07/15 12:56:37 UTC

[incubator-streampark] branch dev updated: [improvement] improvement flink cluster status update (#2858)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 501a47024 [improvement] improvement flink cluster status update (#2858)
501a47024 is described below

commit 501a47024c9d9e193316d3de9b84a34d380823fd
Author: xujiangfeng001 <10...@users.noreply.github.com>
AuthorDate: Sat Jul 15 20:56:32 2023 +0800

    [improvement] improvement flink cluster status update (#2858)
---
 .../streampark/common/enums/ClusterState.java      | 14 +++--
 .../console/core/service/FlinkClusterService.java  |  6 ++-
 .../core/service/impl/FlinkClusterServiceImpl.java | 26 +++++++--
 .../console/core/task/FlinkClusterWatcher.java     | 61 +++++++++++++++-------
 4 files changed, 79 insertions(+), 28 deletions(-)

diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index d9a715f7e..7794f7f95 100644
--- a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -26,11 +26,15 @@ public enum ClusterState implements Serializable {
   /** cluster started */
   RUNNING(1),
   /** cluster stopped */
-  STOPPED(2),
+  CANCELED(2),
   /** cluster lost */
   LOST(3),
   /** cluster unknown */
-  UNKNOWN(4);
+  UNKNOWN(4),
+  STARTING(5),
+  CANCELING(6),
+  FAILED(7),
+  KILLED(8);
 
   private final Integer value;
 
@@ -65,8 +69,10 @@ public enum ClusterState implements Serializable {
   }
 
   public static boolean isFailed(ClusterState state) {
-    return state == ClusterState.STOPPED
+    return state == ClusterState.FAILED
         || state == ClusterState.LOST
-        || state == ClusterState.UNKNOWN;
+        || state == ClusterState.UNKNOWN
+        || state == ClusterState.KILLED
+        || state == ClusterState.CANCELED;
   }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 6d4559735..1c62806db 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -37,8 +37,12 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
 
   void update(FlinkCluster flinkCluster);
 
+  void starting(FlinkCluster flinkCluster);
+
   void start(FlinkCluster flinkCluster);
 
+  void canceling(FlinkCluster flinkCluster);
+
   void shutdown(FlinkCluster flinkCluster);
 
   Boolean existsByClusterId(String clusterId, Long id);
@@ -49,5 +53,5 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
 
   List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode> executionModes);
 
-  void updateClusterFinalState(Long id, ClusterState state);
+  void updateClusterState(Long id, ClusterState state);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index ad6af6617..c5eba5b3f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -155,10 +155,18 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     return ret;
   }
 
+  @Override
+  public void starting(FlinkCluster flinkCluster) {
+    flinkCluster.setClusterState(ClusterState.STARTING.getValue());
+    flinkCluster.setStartTime(new Date());
+    updateById(flinkCluster);
+  }
+
   @Override
   @Transactional(rollbackFor = {Exception.class})
   public void start(FlinkCluster cluster) {
     FlinkCluster flinkCluster = getById(cluster.getId());
+    starting(flinkCluster);
     try {
       DeployResponse deployResponse = deployInternal(flinkCluster);
       ApiAlertException.throwIfNull(
@@ -176,13 +184,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setClusterId(deployResponse.clusterId());
       flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
       flinkCluster.setException(null);
-      flinkCluster.setStartTime(new Date());
       flinkCluster.setEndTime(null);
       FlinkClusterWatcher.addWatching(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
-      flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+      flinkCluster.setClusterState(ClusterState.FAILED.getValue());
       flinkCluster.setException(e.toString());
       updateById(flinkCluster);
       throw new ApiDetailException(e);
@@ -200,6 +207,10 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     flinkCluster.setDescription(paramOfCluster.getDescription());
     if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
       flinkCluster.setAddress(paramOfCluster.getAddress());
+      flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+      flinkCluster.setStartTime(new Date());
+      flinkCluster.setEndTime(null);
+      FlinkClusterWatcher.addWatching(flinkCluster);
     } else {
       flinkCluster.setClusterId(paramOfCluster.getClusterId());
       flinkCluster.setVersionId(paramOfCluster.getVersionId());
@@ -217,6 +228,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     updateById(flinkCluster);
   }
 
+  @Override
+  public void canceling(FlinkCluster flinkCluster) {
+    flinkCluster.setClusterState(ClusterState.CANCELING.getValue());
+    updateById(flinkCluster);
+  }
+
   @Override
   public void shutdown(FlinkCluster cluster) {
     FlinkCluster flinkCluster = this.getById(cluster.getId());
@@ -233,11 +250,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
     ApiAlertException.throwIfTrue(
         existsRunningJob, "Some app is running on this cluster, the cluster cannot be shutdown");
 
+    canceling(flinkCluster);
     try {
       // 4) shutdown
       ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, clusterId);
       ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response failed");
-      flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+      flinkCluster.setClusterState(ClusterState.CANCELED.getValue());
       flinkCluster.setEndTime(new Date());
       FlinkClusterWatcher.unWatching(flinkCluster);
       updateById(flinkCluster);
@@ -280,7 +298,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void updateClusterFinalState(Long id, ClusterState state) {
+  public void updateClusterState(Long id, ClusterState state) {
     LambdaUpdateWrapper<FlinkCluster> updateWrapper =
         new LambdaUpdateWrapper<FlinkCluster>()
             .eq(FlinkCluster::getId, id)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index ed545b0dd..b6351fd7f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -31,7 +31,7 @@ import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hc.client5.http.config.RequestConfig;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -112,7 +112,7 @@ public class FlinkClusterWatcher {
                   () -> {
                     ClusterState state = getClusterState(flinkCluster);
                     if (ClusterState.isFailed(state)) {
-                      flinkClusterService.updateClusterFinalState(flinkCluster.getId(), state);
+                      flinkClusterService.updateClusterState(flinkCluster.getId(), state);
                       unWatching(flinkCluster);
                       alert(flinkCluster, state);
                     }
@@ -161,15 +161,39 @@ public class FlinkClusterWatcher {
   }
 
   /**
-   * cluster get state from flink rest api
+   * get remote cluster state
    *
    * @param flinkCluster
    * @return
    */
   private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
+    return getStateFromFlinkRestApi(flinkCluster);
+  }
+
+  /**
+   * get yarn session cluster state
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
+    final ClusterState state = getStateFromFlinkRestApi(flinkCluster);
+    if (ClusterState.isFailed(state)) {
+      return getStateFromYarnRestApi(flinkCluster);
+    }
+    return state;
+  }
+
+  /**
+   * cluster get state from flink rest api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) {
     final String address = flinkCluster.getAddress();
     if (StringUtils.isEmpty(address)) {
-      return ClusterState.STOPPED;
+      return ClusterState.CANCELED;
     }
     final String jobManagerUrl = flinkCluster.getJobManagerUrl();
     final String flinkUrl =
@@ -195,7 +219,7 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
+  private ClusterState getStateFromYarnRestApi(FlinkCluster flinkCluster) {
     String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
     try {
       String result = YarnUtils.restRequest(yarnUrl);
@@ -203,15 +227,14 @@ public class FlinkClusterWatcher {
         return ClusterState.UNKNOWN;
       }
       YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
-      FinalApplicationStatus status =
-          stringConvertFinalApplicationStatus(yarnAppInfo.getApp().getFinalStatus());
+      YarnApplicationState status = stringConvertYarnState(yarnAppInfo.getApp().getState());
       if (status == null) {
         log.error(
             "cluster id:{} final application status convert failed, invalid string ",
             flinkCluster.getId());
         return ClusterState.UNKNOWN;
       }
-      return finalApplicationStatusConvertClusterState(status);
+      return yarnStateConvertClusterState(status);
     } catch (Exception e) {
       return ClusterState.LOST;
     }
@@ -238,30 +261,30 @@ public class FlinkClusterWatcher {
   }
 
   /**
-   * string converse final application status
+   * string converse yarn application state
    *
    * @param value
    * @return
    */
-  private FinalApplicationStatus stringConvertFinalApplicationStatus(String value) {
-    for (FinalApplicationStatus status : FinalApplicationStatus.values()) {
-      if (status.name().equals(value)) {
-        return status;
+  private YarnApplicationState stringConvertYarnState(String value) {
+    for (YarnApplicationState state : YarnApplicationState.values()) {
+      if (state.name().equals(value)) {
+        return state;
       }
     }
     return null;
   }
 
   /**
-   * final application status convert cluster state
+   * yarn application state convert cluster state
    *
-   * @param status
+   * @param state
    * @return
    */
-  private ClusterState finalApplicationStatusConvertClusterState(FinalApplicationStatus status) {
-    if (status == FinalApplicationStatus.UNDEFINED) {
-      return ClusterState.RUNNING;
+  private ClusterState yarnStateConvertClusterState(YarnApplicationState state) {
+    if (state == YarnApplicationState.FINISHED) {
+      return ClusterState.CANCELED;
     }
-    return ClusterState.STOPPED;
+    return ClusterState.of(state.toString());
   }
 }