You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2023/07/15 12:56:37 UTC
[incubator-streampark] branch dev updated: [improvement] improvement flink cluster status update (#2858)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 501a47024 [improvement] improvement flink cluster status update (#2858)
501a47024 is described below
commit 501a47024c9d9e193316d3de9b84a34d380823fd
Author: xujiangfeng001 <10...@users.noreply.github.com>
AuthorDate: Sat Jul 15 20:56:32 2023 +0800
[improvement] improvement flink cluster status update (#2858)
---
.../streampark/common/enums/ClusterState.java | 14 +++--
.../console/core/service/FlinkClusterService.java | 6 ++-
.../core/service/impl/FlinkClusterServiceImpl.java | 26 +++++++--
.../console/core/task/FlinkClusterWatcher.java | 61 +++++++++++++++-------
4 files changed, 79 insertions(+), 28 deletions(-)
diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index d9a715f7e..7794f7f95 100644
--- a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -26,11 +26,15 @@ public enum ClusterState implements Serializable {
/** cluster started */
RUNNING(1),
/** cluster stopped */
- STOPPED(2),
+ CANCELED(2),
/** cluster lost */
LOST(3),
/** cluster unknown */
- UNKNOWN(4);
+ UNKNOWN(4),
+ STARTING(5),
+ CANCELING(6),
+ FAILED(7),
+ KILLED(8);
private final Integer value;
@@ -65,8 +69,10 @@ public enum ClusterState implements Serializable {
}
public static boolean isFailed(ClusterState state) {
- return state == ClusterState.STOPPED
+ return state == ClusterState.FAILED
|| state == ClusterState.LOST
- || state == ClusterState.UNKNOWN;
+ || state == ClusterState.UNKNOWN
+ || state == ClusterState.KILLED
+ || state == ClusterState.CANCELED;
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 6d4559735..1c62806db 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -37,8 +37,12 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
void update(FlinkCluster flinkCluster);
+ void starting(FlinkCluster flinkCluster);
+
void start(FlinkCluster flinkCluster);
+ void canceling(FlinkCluster flinkCluster);
+
void shutdown(FlinkCluster flinkCluster);
Boolean existsByClusterId(String clusterId, Long id);
@@ -49,5 +53,5 @@ public interface FlinkClusterService extends IService<FlinkCluster> {
List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode> executionModes);
- void updateClusterFinalState(Long id, ClusterState state);
+ void updateClusterState(Long id, ClusterState state);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index ad6af6617..c5eba5b3f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -155,10 +155,18 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
return ret;
}
+ @Override
+ public void starting(FlinkCluster flinkCluster) {
+ flinkCluster.setClusterState(ClusterState.STARTING.getValue());
+ flinkCluster.setStartTime(new Date());
+ updateById(flinkCluster);
+ }
+
@Override
@Transactional(rollbackFor = {Exception.class})
public void start(FlinkCluster cluster) {
FlinkCluster flinkCluster = getById(cluster.getId());
+ starting(flinkCluster);
try {
DeployResponse deployResponse = deployInternal(flinkCluster);
ApiAlertException.throwIfNull(
@@ -176,13 +184,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setClusterId(deployResponse.clusterId());
flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
flinkCluster.setException(null);
- flinkCluster.setStartTime(new Date());
flinkCluster.setEndTime(null);
FlinkClusterWatcher.addWatching(flinkCluster);
updateById(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
- flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+ flinkCluster.setClusterState(ClusterState.FAILED.getValue());
flinkCluster.setException(e.toString());
updateById(flinkCluster);
throw new ApiDetailException(e);
@@ -200,6 +207,10 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setDescription(paramOfCluster.getDescription());
if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
flinkCluster.setAddress(paramOfCluster.getAddress());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+ flinkCluster.setStartTime(new Date());
+ flinkCluster.setEndTime(null);
+ FlinkClusterWatcher.addWatching(flinkCluster);
} else {
flinkCluster.setClusterId(paramOfCluster.getClusterId());
flinkCluster.setVersionId(paramOfCluster.getVersionId());
@@ -217,6 +228,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
updateById(flinkCluster);
}
+ @Override
+ public void canceling(FlinkCluster flinkCluster) {
+ flinkCluster.setClusterState(ClusterState.CANCELING.getValue());
+ updateById(flinkCluster);
+ }
+
@Override
public void shutdown(FlinkCluster cluster) {
FlinkCluster flinkCluster = this.getById(cluster.getId());
@@ -233,11 +250,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
ApiAlertException.throwIfTrue(
existsRunningJob, "Some app is running on this cluster, the cluster cannot be shutdown");
+ canceling(flinkCluster);
try {
// 4) shutdown
ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, clusterId);
ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response failed");
- flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+ flinkCluster.setClusterState(ClusterState.CANCELED.getValue());
flinkCluster.setEndTime(new Date());
FlinkClusterWatcher.unWatching(flinkCluster);
updateById(flinkCluster);
@@ -280,7 +298,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void updateClusterFinalState(Long id, ClusterState state) {
+ public void updateClusterState(Long id, ClusterState state) {
LambdaUpdateWrapper<FlinkCluster> updateWrapper =
new LambdaUpdateWrapper<FlinkCluster>()
.eq(FlinkCluster::getId, id)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index ed545b0dd..b6351fd7f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -31,7 +31,7 @@ import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hc.client5.http.config.RequestConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -112,7 +112,7 @@ public class FlinkClusterWatcher {
() -> {
ClusterState state = getClusterState(flinkCluster);
if (ClusterState.isFailed(state)) {
- flinkClusterService.updateClusterFinalState(flinkCluster.getId(), state);
+ flinkClusterService.updateClusterState(flinkCluster.getId(), state);
unWatching(flinkCluster);
alert(flinkCluster, state);
}
@@ -161,15 +161,39 @@ public class FlinkClusterWatcher {
}
/**
- * cluster get state from flink rest api
+ * get remote cluster state
*
* @param flinkCluster
* @return
*/
private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
+ return getStateFromFlinkRestApi(flinkCluster);
+ }
+
+ /**
+ * get yarn session cluster state
+ *
+ * @param flinkCluster
+ * @return
+ */
+ private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
+ final ClusterState state = getStateFromFlinkRestApi(flinkCluster);
+ if (ClusterState.isFailed(state)) {
+ return getStateFromYarnRestApi(flinkCluster);
+ }
+ return state;
+ }
+
+ /**
+ * cluster get state from flink rest api
+ *
+ * @param flinkCluster
+ * @return
+ */
+ private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) {
final String address = flinkCluster.getAddress();
if (StringUtils.isEmpty(address)) {
- return ClusterState.STOPPED;
+ return ClusterState.CANCELED;
}
final String jobManagerUrl = flinkCluster.getJobManagerUrl();
final String flinkUrl =
@@ -195,7 +219,7 @@ public class FlinkClusterWatcher {
* @param flinkCluster
* @return
*/
- private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
+ private ClusterState getStateFromYarnRestApi(FlinkCluster flinkCluster) {
String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
try {
String result = YarnUtils.restRequest(yarnUrl);
@@ -203,15 +227,14 @@ public class FlinkClusterWatcher {
return ClusterState.UNKNOWN;
}
YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
- FinalApplicationStatus status =
- stringConvertFinalApplicationStatus(yarnAppInfo.getApp().getFinalStatus());
+ YarnApplicationState status = stringConvertYarnState(yarnAppInfo.getApp().getState());
if (status == null) {
log.error(
"cluster id:{} final application status convert failed, invalid string ",
flinkCluster.getId());
return ClusterState.UNKNOWN;
}
- return finalApplicationStatusConvertClusterState(status);
+ return yarnStateConvertClusterState(status);
} catch (Exception e) {
return ClusterState.LOST;
}
@@ -238,30 +261,30 @@ public class FlinkClusterWatcher {
}
/**
- * string converse final application status
+ * string converse yarn application state
*
* @param value
* @return
*/
- private FinalApplicationStatus stringConvertFinalApplicationStatus(String value) {
- for (FinalApplicationStatus status : FinalApplicationStatus.values()) {
- if (status.name().equals(value)) {
- return status;
+ private YarnApplicationState stringConvertYarnState(String value) {
+ for (YarnApplicationState state : YarnApplicationState.values()) {
+ if (state.name().equals(value)) {
+ return state;
}
}
return null;
}
/**
- * final application status convert cluster state
+ * yarn application state convert cluster state
*
- * @param status
+ * @param state
* @return
*/
- private ClusterState finalApplicationStatusConvertClusterState(FinalApplicationStatus status) {
- if (status == FinalApplicationStatus.UNDEFINED) {
- return ClusterState.RUNNING;
+ private ClusterState yarnStateConvertClusterState(YarnApplicationState state) {
+ if (state == YarnApplicationState.FINISHED) {
+ return ClusterState.CANCELED;
}
- return ClusterState.STOPPED;
+ return ClusterState.of(state.toString());
}
}