You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2024/04/15 13:44:25 UTC
(incubator-streampark) 02/02: [Improve] ThreadPoolExecutor param improvement
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit aba9eb6b4ad1ab98f4e95226637d12d336fd4431
Author: benjobs <be...@gmail.com>
AuthorDate: Mon Apr 15 21:44:04 2024 +0800
[Improve] ThreadPoolExecutor param improvement
---
.../console/core/task/FlinkK8sWatcherWrapper.java | 2 +-
.../flink/kubernetes/ChangeEventBus.scala | 12 +++++++---
.../flink/kubernetes/KubernetesRetriever.scala | 27 ++++++++++++----------
3 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index bcacd21ba..f63f21b37 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -104,7 +104,7 @@ public class FlinkK8sWatcherWrapper {
}
// filter out the application that should be tracking
return k8sApplication.stream()
- .filter(app -> FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
+ .filter(app -> !FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
.map(this::toTrackId)
.collect(Collectors.toList());
}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
index e7d8d3404..ea1749564 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ChangeEventBus.scala
@@ -17,18 +17,24 @@
package org.apache.streampark.flink.kubernetes
+import org.apache.streampark.common.util.ThreadUtils
+
import com.google.common.eventbus.{AsyncEventBus, EventBus}
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
class ChangeEventBus {
+ private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)
+
private val execPool = new ThreadPoolExecutor(
- Runtime.getRuntime.availableProcessors * 5,
- Runtime.getRuntime.availableProcessors * 10,
+ CPU_NUM,
+ CPU_NUM * 5,
60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue[Runnable](1024))
+ new LinkedBlockingQueue[Runnable],
+ ThreadUtils.threadFactory("streampark-k8s-watching-thread")
+ )
private[kubernetes] val asyncEventBus =
new AsyncEventBus("[StreamPark][flink-k8s]AsyncEventBus", execPool)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 47bfd62ea..da6870f17 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -87,18 +87,21 @@ object KubernetesRetriever extends Logger {
val clientFactory: ClusterClientFactory[String] =
clusterClientServiceLoader.getClusterClientFactory(flinkConfig)
- val clusterProvider: KubernetesClusterDescriptor =
- clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[KubernetesClusterDescriptor]
-
- Try {
- clusterProvider
- .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
- .getClusterClient
- } match {
- case Success(v) => Some(v)
- case Failure(e) =>
- logError(s"Get flinkClient error, the error is: $e")
- None
+ Utils.using(
+ clientFactory
+ .createClusterDescriptor(flinkConfig)
+ .asInstanceOf[KubernetesClusterDescriptor]) {
+ clusterProvider =>
+ Try {
+ clusterProvider
+ .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+ .getClusterClient
+ } match {
+ case Success(v) => Some(v)
+ case Failure(e) =>
+ logError(s"Get flinkClient error, the error is: $e")
+ None
+ }
}
}