You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/22 11:23:45 UTC

[inlong] branch master updated: [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b1fd956e0 [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852)
b1fd956e0 is described below

commit b1fd956e0c966700b842ff938e690130d65815ee
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Sep 22 19:23:40 2022 +0800

    [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852)
---
 .../tubemq/manager/service/TopicBackendWorker.java      | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java
index aaed365bf..4563a50b0 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.tubemq.manager.service;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.tubemq.manager.repository.TopicRepository;
 import org.apache.inlong.tubemq.manager.service.interfaces.NodeService;
@@ -48,6 +51,7 @@ public class TopicBackendWorker implements DisposableBean, Runnable  {
             new ConcurrentHashMap<>();
     private final AtomicInteger notSatisfiedCount = new AtomicInteger(0);
     private final NodeService nodeService;
+    private final ScheduledExecutorService workerExecutor;
 
     @Autowired
     private TopicRepository topicRepository;
@@ -66,10 +70,11 @@ public class TopicBackendWorker implements DisposableBean, Runnable  {
     private int queueMaxRunningSize;
 
     TopicBackendWorker() {
-        Thread thread = new Thread(this);
-        // daemon thread
-        thread.setDaemon(true);
-        thread.start();
+        ThreadFactoryBuilder factoryBuilder = new ThreadFactoryBuilder();
+        this.workerExecutor = Executors
+                .newSingleThreadScheduledExecutor(
+                        factoryBuilder.setNameFormat("tubemq-manager-topic-backend-worker").build());
+        workerExecutor.schedule(this, queueThreadInterval, TimeUnit.SECONDS);
         nodeService = new NodeServiceImpl(this);
     }
 
@@ -121,11 +126,10 @@ public class TopicBackendWorker implements DisposableBean, Runnable  {
     @Override
     public void run() {
         log.info("TopicBackendWorker has started");
-        while (runFlag.get()) {
+        if (runFlag.get()) {
             try {
                 batchAddTopic();
                 checkTopicFromDB();
-                TimeUnit.SECONDS.sleep(queueThreadInterval);
             } catch (Exception exception) {
                 log.warn("exception caught", exception);
             }
@@ -136,5 +140,6 @@ public class TopicBackendWorker implements DisposableBean, Runnable  {
     public void destroy() throws Exception {
         runFlag.set(false);
         nodeService.close();
+        this.workerExecutor.shutdown();
     }
 }