You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by "RocMarshal (via GitHub)" <gi...@apache.org> on 2023/04/11 14:43:11 UTC

[GitHub] [incubator-streampark] RocMarshal commented on a diff in pull request #2541: [ISSUE-2498][Feature] [SubTask] The cluster supports remote and yarn session heartbeat monitoring

RocMarshal commented on code in PR #2541:
URL: https://github.com/apache/incubator-streampark/pull/2541#discussion_r1162880114


##########
streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java:
##########
@@ -24,12 +24,21 @@ public enum ClusterState implements Serializable {
   /** The cluster was just created but not started */
   CREATED(0),
   /** cluster started */
-  STARTED(1),
+  RUNNING(1),
   /** cluster stopped */
   STOPPED(2),
-
   /** cluster lost */
-  LOST(3);
+  LOST(3),
+  /** cluster undefined */
+  UNDEFINED(4),
+  /** cluster successfully */
+  SUCCEEDED(5),

Review Comment:
   The two enum look like redundant on platform side.
   Its could be used to represent the final status on yarn just platform side.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;

Review Comment:
   AnchorA: What about the Long.MIN_VALUE on init-value  ?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java:
##########
@@ -182,21 +187,25 @@ public void start(FlinkCluster cluster) {
           String address =
               YarnUtils.getRMWebAppURL() + "/proxy/" + deployResponse.clusterId() + "/";
           flinkCluster.setAddress(address);
+          flinkCluster.setJobManagerUrl(deployResponse.address());

Review Comment:
   ```suggestion
   ```



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;
+
+  // Track interval  every 30 seconds
+  private static final Long WATCHER_INTERVAL = 1000L * 30;
+
+  /** Watcher cluster lists */
+  private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(0);
+
+  /** Thread pool for processing status monitoring for each cluster */
+  private static final ExecutorService EXECUTOR =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("flink-cluster-watching-executor"));
+
+  /** Initialize cluster cache */
+  @PostConstruct
+  private void init() {
+    WATCHER_CLUSTERS.clear();
+    List<FlinkCluster> flinkClusters = flinkClusterService.list();
+    flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
+  }
+
+  /** flinkcluster persistent */
+  @PreDestroy
+  private void stop() {
+    // TODO: flinkcluster persistent
+  }
+
+  @Scheduled(fixedDelay = 1000)
+  private void start() {
+    if (lastWatcheringTime == null) {
+      watcher();
+    } else if (System.currentTimeMillis() - lastWatcheringTime >= WATCHER_INTERVAL) {

Review Comment:
   ```suggestion
      if (System.currentTimeMillis() - lastWatcheringTime >= WATCHER_INTERVAL) {
   ```
   It could be adjusted due to `anchorA`.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;
+
+  // Track interval  every 30 seconds
+  private static final Long WATCHER_INTERVAL = 1000L * 30;
+
+  /** Watcher cluster lists */
+  private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(0);
+
+  /** Thread pool for processing status monitoring for each cluster */
+  private static final ExecutorService EXECUTOR =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("flink-cluster-watching-executor"));
+
+  /** Initialize cluster cache */
+  @PostConstruct
+  private void init() {
+    WATCHER_CLUSTERS.clear();
+    List<FlinkCluster> flinkClusters = flinkClusterService.list();
+    flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));

Review Comment:
   The expected state of some clusters is either stopped or abnormal, which should no longer require monitoring.
   What do you think about on it ?
   
   
   
   Based on the above questions, imagine a situation where Yarn session1 cluster is in a running state. After running for a period of time, the cluster state is FAILED, while Yarn session2 is a cluster that has been created but not started. However, at this point, the platform restarts. How can we ensure that after the restart, only Yarn session1 continues to be monitored and Yarn session2 is ignored?
   
   For example, introducing an expectState field to identify the expected state of the cluster, only monitoring the heartbeat of clusters with an expected state of RUNNING.
   
   I'd like to hear anything better about solving the  cases.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;
+
+  // Track interval  every 30 seconds
+  private static final Long WATCHER_INTERVAL = 1000L * 30;
+
+  /** Watcher cluster lists */
+  private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(0);
+
+  /** Thread pool for processing status monitoring for each cluster */
+  private static final ExecutorService EXECUTOR =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("flink-cluster-watching-executor"));
+
+  /** Initialize cluster cache */
+  @PostConstruct
+  private void init() {
+    WATCHER_CLUSTERS.clear();
+    List<FlinkCluster> flinkClusters = flinkClusterService.list();
+    flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
+  }
+
+  /** flinkcluster persistent */
+  @PreDestroy
+  private void stop() {
+    // TODO: flinkcluster persistent
+  }
+
+  @Scheduled(fixedDelay = 1000)
+  private void start() {
+    if (lastWatcheringTime == null) {
+      watcher();
+    } else if (System.currentTimeMillis() - lastWatcheringTime >= WATCHER_INTERVAL) {
+      watcher();
+    }
+  }
+
+  private void watcher() {
+    lastWatcheringTime = System.currentTimeMillis();
+    for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
+      EXECUTOR.execute(
+          () -> {
+            FlinkCluster flinkCluster = entry.getValue();
+            Integer clusterExecutionMode = flinkCluster.getExecutionMode();
+            if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
+              ClusterState state = getClusterState(flinkCluster);
+              handleClusterState(flinkCluster, state);
+            } else {
+              // TODO: K8s Session status monitoring
+            }
+          });
+    }
+  }
+
+  /**
+   * cluster get state from flink or yarn api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterState(FlinkCluster flinkCluster) {
+    ClusterState state = getClusterStateFromFlinkAPI(flinkCluster);
+    if (ClusterState.isRunningState(state)) {
+      return state;
+    } else {
+      return getClusterStateFromYarnAPI(flinkCluster);
+    }
+  }
+
+  /**
+   * cluster get state from flink rest api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) {
+    final String address = flinkCluster.getAddress();
+    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
+    if (StringUtils.isEmpty(address)) {
+      return ClusterState.STOPPED;
+    }
+    final String flinkUrl =
+        StringUtils.isEmpty(jobManagerUrl)
+            ? address.concat("/overview")
+            : jobManagerUrl.concat("/overview");

Review Comment:
   After JM failover in Yarn session mode, the `jobManagerUrl` here will change, and the `address` is filled in based on Yarn rest, and will not change after failover.
   
   So could we re-evaluate whether it is really necessary to introduce jobManagerUrl?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java:
##########
@@ -182,21 +187,25 @@ public void start(FlinkCluster cluster) {
           String address =
               YarnUtils.getRMWebAppURL() + "/proxy/" + deployResponse.clusterId() + "/";
           flinkCluster.setAddress(address);
+          flinkCluster.setJobManagerUrl(deployResponse.address());
         } else {
           flinkCluster.setAddress(deployResponse.address());
         }
         flinkCluster.setClusterId(deployResponse.clusterId());

Review Comment:
   ```suggestion
           flinkCluster.setJobManagerUrl(deployResponse.address());
           flinkCluster.setClusterId(deployResponse.clusterId());
   ```



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;

Review Comment:
   ```suggestion
     private Long lastWatchingTime = Long.MIN_VALUE;
   ```



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;
+
+  // Track interval  every 30 seconds
+  private static final Long WATCHER_INTERVAL = 1000L * 30;

Review Comment:
   What about `  private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);` ?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;
+
+  // Track interval  every 30 seconds
+  private static final Long WATCHER_INTERVAL = 1000L * 30;
+
+  /** Watcher cluster lists */
+  private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(0);
+
+  /** Thread pool for processing status monitoring for each cluster */
+  private static final ExecutorService EXECUTOR =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("flink-cluster-watching-executor"));
+
+  /** Initialize cluster cache */
+  @PostConstruct
+  private void init() {
+    WATCHER_CLUSTERS.clear();
+    List<FlinkCluster> flinkClusters = flinkClusterService.list();
+    flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
+  }
+
+  /** flinkcluster persistent */
+  @PreDestroy
+  private void stop() {
+    // TODO: flinkcluster persistent
+  }
+
+  @Scheduled(fixedDelay = 1000)
+  private void start() {
+    if (lastWatcheringTime == null) {
+      watcher();
+    } else if (System.currentTimeMillis() - lastWatcheringTime >= WATCHER_INTERVAL) {
+      watcher();
+    }
+  }
+
+  private void watcher() {
+    lastWatcheringTime = System.currentTimeMillis();
+    for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
+      EXECUTOR.execute(
+          () -> {
+            FlinkCluster flinkCluster = entry.getValue();
+            Integer clusterExecutionMode = flinkCluster.getExecutionMode();
+            if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
+              ClusterState state = getClusterState(flinkCluster);
+              handleClusterState(flinkCluster, state);
+            } else {
+              // TODO: K8s Session status monitoring
+            }

Review Comment:
   What about splitting its into three methods mapping to the corresponding three cases `remote`,`y-session`, `k8s-session` ?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.YarnUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.metrics.flink.Overview;
+import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
+@Slf4j
+@Component
+public class FlinkClusterWatcher {
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  private Long lastWatcheringTime = null;
+
+  // Track interval  every 30 seconds
+  private static final Long WATCHER_INTERVAL = 1000L * 30;
+
+  /** Watcher cluster lists */
+  private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(0);
+
+  /** Thread pool for processing status monitoring for each cluster */
+  private static final ExecutorService EXECUTOR =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("flink-cluster-watching-executor"));
+
+  /** Initialize cluster cache */
+  @PostConstruct
+  private void init() {
+    WATCHER_CLUSTERS.clear();
+    List<FlinkCluster> flinkClusters = flinkClusterService.list();
+    flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
+  }
+
+  /** flinkcluster persistent */
+  @PreDestroy
+  private void stop() {
+    // TODO: flinkcluster persistent
+  }
+
+  @Scheduled(fixedDelay = 1000)
+  private void start() {
+    if (lastWatcheringTime == null) {
+      watcher();
+    } else if (System.currentTimeMillis() - lastWatcheringTime >= WATCHER_INTERVAL) {
+      watcher();
+    }
+  }
+
+  private void watcher() {
+    lastWatcheringTime = System.currentTimeMillis();
+    for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
+      EXECUTOR.execute(
+          () -> {
+            FlinkCluster flinkCluster = entry.getValue();
+            Integer clusterExecutionMode = flinkCluster.getExecutionMode();
+            if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
+              ClusterState state = getClusterState(flinkCluster);
+              handleClusterState(flinkCluster, state);
+            } else {
+              // TODO: K8s Session status monitoring
+            }
+          });
+    }
+  }
+
+  /**
+   * cluster get state from flink or yarn api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterState(FlinkCluster flinkCluster) {
+    ClusterState state = getClusterStateFromFlinkAPI(flinkCluster);
+    if (ClusterState.isRunningState(state)) {
+      return state;
+    } else {
+      return getClusterStateFromYarnAPI(flinkCluster);
+    }
+  }
+
+  /**
+   * cluster get state from flink rest api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) {
+    final String address = flinkCluster.getAddress();
+    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
+    if (StringUtils.isEmpty(address)) {
+      return ClusterState.STOPPED;
+    }
+    final String flinkUrl =
+        StringUtils.isEmpty(jobManagerUrl)
+            ? address.concat("/overview")
+            : jobManagerUrl.concat("/overview");
+    try {
+      String res =
+          HttpClientUtils.httpGetRequest(
+              flinkUrl, RequestConfig.custom().setConnectTimeout(5000).build());
+
+      JacksonUtils.read(res, Overview.class);
+      return ClusterState.RUNNING;
+    } catch (Exception ignored) {
+      log.error("cluster id:{} get state from flink api failed", flinkCluster.getId());
+    }
+    return ClusterState.UNKNOWN;
+  }
+
+  /**
+   * cluster get state from yarn rest api
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
+    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+      return ClusterState.STOPPED;
+    }
+    String clusterId = flinkCluster.getClusterId();
+    if (StringUtils.isEmpty(clusterId)) {
+      return ClusterState.STOPPED;
+    }
+    String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
+    try {
+      String result = YarnUtils.restRequest(yarnUrl);
+      if (null == result) {
+        return ClusterState.UNKNOWN;
+      }
+      YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
+      ClusterState state = ClusterState.of(yarnAppInfo.getApp().getFinalStatus());
+      if (ClusterState.UNDEFINED.equals(state)) {

Review Comment:
   The Yarn resourcemanager rest API will include special cases where the state is accepted but the final status is UNDEFINED.
   
   So it would be better  to judge the current Yarn session status based on `state` and `finalStatus` of yarnAPP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org