You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2024/04/15 13:44:25 UTC

(incubator-streampark) 02/02: [Improve] ThreadPoolExecutor param improvement

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit aba9eb6b4ad1ab98f4e95226637d12d336fd4431
Author: benjobs <be...@gmail.com>
AuthorDate: Mon Apr 15 21:44:04 2024 +0800

    [Improve] ThreadPoolExecutor param improvement
---
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  2 +-
 .../flink/kubernetes/ChangeEventBus.scala          | 12 +++++++---
 .../flink/kubernetes/KubernetesRetriever.scala     | 27 ++++++++++++----------
 3 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index bcacd21ba..f63f21b37 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -104,7 +104,7 @@ public class FlinkK8sWatcherWrapper {
     }
     // filter out the application that should be tracking
     return k8sApplication.stream()
-        .filter(app -> FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
+        .filter(app -> !FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
         .map(this::toTrackId)
         .collect(Collectors.toList());
   }
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
index e7d8d3404..ea1749564 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
@@ -17,18 +17,24 @@
 
 package org.apache.streampark.flink.kubernetes
 
+import org.apache.streampark.common.util.ThreadUtils
+
 import com.google.common.eventbus.{AsyncEventBus, EventBus}
 
 import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
 
 class ChangeEventBus {
 
+  private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)
+
   private val execPool = new ThreadPoolExecutor(
-    Runtime.getRuntime.availableProcessors * 5,
-    Runtime.getRuntime.availableProcessors * 10,
+    CPU_NUM,
+    CPU_NUM * 5,
     60L,
     TimeUnit.SECONDS,
-    new LinkedBlockingQueue[Runnable](1024))
+    new LinkedBlockingQueue[Runnable],
+    ThreadUtils.threadFactory("streampark-k8s-watching-thread")
+  )
 
   private[kubernetes] val asyncEventBus =
     new AsyncEventBus("[StreamPark][flink-k8s]AsyncEventBus", execPool)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 47bfd62ea..da6870f17 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -87,18 +87,21 @@ object KubernetesRetriever extends Logger {
     val clientFactory: ClusterClientFactory[String] =
       clusterClientServiceLoader.getClusterClientFactory(flinkConfig)
 
-    val clusterProvider: KubernetesClusterDescriptor =
-      clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[KubernetesClusterDescriptor]
-
-    Try {
-      clusterProvider
-        .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
-        .getClusterClient
-    } match {
-      case Success(v) => Some(v)
-      case Failure(e) =>
-        logError(s"Get flinkClient error, the error is: $e")
-        None
+    Utils.using(
+      clientFactory
+        .createClusterDescriptor(flinkConfig)
+        .asInstanceOf[KubernetesClusterDescriptor]) {
+      clusterProvider =>
+        Try {
+          clusterProvider
+            .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+            .getClusterClient
+        } match {
+          case Success(v) => Some(v)
+          case Failure(e) =>
+            logError(s"Get flinkClient error, the error is: $e")
+            None
+        }
     }
   }