You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/22 11:23:45 UTC
[inlong] branch master updated: [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b1fd956e0 [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852)
b1fd956e0 is described below
commit b1fd956e0c966700b842ff938e690130d65815ee
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Sep 22 19:23:40 2022 +0800
[INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852)
---
.../tubemq/manager/service/TopicBackendWorker.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java
index aaed365bf..4563a50b0 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java
@@ -17,6 +17,9 @@
package org.apache.inlong.tubemq.manager.service;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.tubemq.manager.repository.TopicRepository;
import org.apache.inlong.tubemq.manager.service.interfaces.NodeService;
@@ -48,6 +51,7 @@ public class TopicBackendWorker implements DisposableBean, Runnable {
new ConcurrentHashMap<>();
private final AtomicInteger notSatisfiedCount = new AtomicInteger(0);
private final NodeService nodeService;
+ private final ScheduledExecutorService workerExecutor;
@Autowired
private TopicRepository topicRepository;
@@ -66,10 +70,11 @@ public class TopicBackendWorker implements DisposableBean, Runnable {
private int queueMaxRunningSize;
TopicBackendWorker() {
- Thread thread = new Thread(this);
- // daemon thread
- thread.setDaemon(true);
- thread.start();
+ ThreadFactoryBuilder factoryBuilder = new ThreadFactoryBuilder();
+ this.workerExecutor = Executors
+ .newSingleThreadScheduledExecutor(
+ factoryBuilder.setNameFormat("tubemq-manager-topic-backend-worker").build());
+ workerExecutor.schedule(this, queueThreadInterval, TimeUnit.SECONDS);
nodeService = new NodeServiceImpl(this);
}
@@ -121,11 +126,10 @@ public class TopicBackendWorker implements DisposableBean, Runnable {
@Override
public void run() {
log.info("TopicBackendWorker has started");
- while (runFlag.get()) {
+ if (runFlag.get()) {
try {
batchAddTopic();
checkTopicFromDB();
- TimeUnit.SECONDS.sleep(queueThreadInterval);
} catch (Exception exception) {
log.warn("exception caught", exception);
}
@@ -136,5 +140,6 @@ public class TopicBackendWorker implements DisposableBean, Runnable {
public void destroy() throws Exception {
runFlag.set(false);
nodeService.close();
+ this.workerExecutor.shutdown();
}
}