You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/17 13:19:28 UTC

[inlong] branch master updated: [INLONG-5907][TubeMQ] Replace the while-sleep with ScheduledExecutorService for tube server loopProcess (#5908)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 536e8e6c3 [INLONG-5907][TubeMQ] Replace the while-sleep with ScheduledExecutorService for tube server loopProcess (#5908)
536e8e6c3 is described below

commit 536e8e6c3f99a9c68ac26663ef7c74d994b4f3e9
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Sep 17 21:19:23 2022 +0800

    [INLONG-5907][TubeMQ] Replace the while-sleep with ScheduledExecutorService for tube server loopProcess (#5908)
---
 .../corebase/daemon/AbstractDaemonService.java     | 23 +++++++++++-----------
 .../server/broker/offset/DefaultOffsetManager.java | 16 +++++----------
 .../server/broker/offset/OffsetRecordService.java  | 21 +++++++-------------
 .../server/broker/stats/TrafficStatsService.java   | 22 ++++++++-------------
 .../master/utils/SimpleVisitTokenManager.java      | 20 +++++++------------
 5 files changed, 38 insertions(+), 64 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
index d99006a11..99bde0ed3 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.tubemq.corebase.daemon;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,24 +33,26 @@ public abstract class AbstractDaemonService implements Service, Runnable {
     private final Thread daemon;
     private final AtomicBoolean shutdown =
             new AtomicBoolean(false);
+    private final ScheduledExecutorService processorExecutor;
 
     public AbstractDaemonService(final String serviceName, final long intervalMs) {
         this.name = serviceName;
         this.intervalMs = intervalMs;
         this.daemon = new Thread(this, serviceName + "-daemon-thread");
         this.daemon.setDaemon(true);
+        this.processorExecutor = Executors
+                .newSingleThreadScheduledExecutor(
+                        new DefaultThreadFactory("tubemq-core-loop-processor"));
     }
 
     @Override
     public void run() {
         logger.info(new StringBuilder(256).append(name)
                 .append("-daemon-thread started").toString());
-        this.loopProcess(this.intervalMs);
-        logger.info(new StringBuilder(256).append(name)
-                .append("-daemon-thread stopped").toString());
+        processorExecutor.schedule(this::loopProcess, intervalMs, TimeUnit.MILLISECONDS);
     }
 
-    protected abstract void loopProcess(long intervalMs);
+    protected abstract void loopProcess();
 
     @Override
     public void start() {
@@ -66,14 +72,7 @@ public abstract class AbstractDaemonService implements Service, Runnable {
         if (this.shutdown.compareAndSet(false, true)) {
             logger.info(new StringBuilder(256).append(name)
                     .append("-daemon-thread closing ......").toString());
-            try {
-                if (this.daemon != null) {
-                    this.daemon.interrupt();
-                    this.daemon.join();
-                }
-            } catch (Throwable e) {
-                //
-            }
+            this.processorExecutor.shutdown();
             logger.info(new StringBuilder(256).append(name)
                     .append("-daemon-thread stopped!").toString());
             return false;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index d48936801..3b46460aa 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -65,17 +65,11 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     }
 
     @Override
-    protected void loopProcess(long intervalMs) {
-        while (!super.isStopped()) {
-            try {
-                Thread.sleep(intervalMs);
-                commitCfmOffsets(false);
-            } catch (InterruptedException e) {
-                logger.warn("[Offset Manager] Daemon commit thread has been interrupted");
-                return;
-            } catch (Throwable t) {
-                logger.error("[Offset Manager] Daemon commit thread throw error ", t);
-            }
+    protected void loopProcess() {
+        try {
+            commitCfmOffsets(false);
+        } catch (Throwable t) {
+            logger.error("[Offset Manager] Daemon commit thread throw error ", t);
         }
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 8d65ba8cb..5d3d8aaba 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -53,21 +53,14 @@ public class OffsetRecordService extends AbstractDaemonService {
     }
 
     @Override
-    protected void loopProcess(long intervalMs) {
-        StringBuilder strBuff = new StringBuilder(2048);
-        logger.info("[Offset-Record Service] start offset-record service thread");
-        while (!super.isStopped()) {
-            try {
-                Thread.sleep(intervalMs);
-                // get group offset information
-                storeRecord2LocalTopic(strBuff);
-            } catch (InterruptedException e) {
-                return;
-            } catch (Throwable t) {
-                //
-            }
+    protected void loopProcess() {
+        try {
+            StringBuilder strBuff = new StringBuilder(2048);
+            // get group offset information
+            storeRecord2LocalTopic(strBuff);
+        } catch (Throwable throwable) {
+            logger.error("[Offset Record] Daemon commit thread throw error ", throwable);
         }
-        logger.info("[Offset-Record Service] exit offset-record service thread");
     }
 
     public void close() {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index d7b43aa97..fded7ef0d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -67,20 +67,14 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
     }
 
     @Override
-    protected void loopProcess(long intervalMs) {
-        int befIndex;
-        while (!super.isStopped()) {
-            try {
-                Thread.sleep(intervalMs);
-                // Snapshot metric data
-                befIndex = writableIndex.getAndIncrement();
-                // Output 2 file
-                output2file(befIndex);
-            } catch (InterruptedException e) {
-                return;
-            } catch (Throwable t) {
-                //
-            }
+    protected void loopProcess() {
+        try {
+            // Snapshot metric data
+            int befIndex = writableIndex.getAndIncrement();
+            // Output 2 file
+            output2file(befIndex);
+        } catch (Throwable throwable) {
+            logger.error("[Traffic Stats] Daemon commit thread throw error ", throwable);
         }
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
index cec3cf875..6ab91635c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
@@ -57,20 +57,14 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
     }
 
     @Override
-    protected void loopProcess(long intervalMs) {
-        while (!super.isStopped()) {
-            try {
-                Thread.sleep(intervalMs);
-                validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
-                brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
+    protected void loopProcess() {
+        try {
+            validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
+            brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
                     .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
-                strBuilder.delete(0, strBuilder.length());
-            } catch (InterruptedException e) {
-                logger.warn("[VisitToken Manager] Daemon generator thread has been interrupted");
-                return;
-            } catch (Throwable t) {
-                logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
-            }
+            strBuilder.delete(0, strBuilder.length());
+        }  catch (Throwable t) {
+            logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
         }
     }