You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/20 12:41:17 UTC

[inlong] branch master updated: [INLONG-5946][TubeMQ] AbstractDaemonService implementation optimization (#5947)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cddfda3d4 [INLONG-5946][TubeMQ] AbstractDaemonService implementation optimization (#5947)
cddfda3d4 is described below

commit cddfda3d40e0c1667f808af4cd44b0bad5f0e745
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Sep 20 20:41:12 2022 +0800

    [INLONG-5946][TubeMQ] AbstractDaemonService implementation optimization (#5947)
---
 .../corebase/daemon/AbstractDaemonService.java     | 47 ++++++++++++++++------
 .../server/broker/offset/DefaultOffsetManager.java |  2 +-
 .../server/broker/offset/OffsetRecordService.java  |  3 +-
 .../server/broker/stats/TrafficStatsService.java   |  8 ++--
 .../master/utils/SimpleVisitTokenManager.java      | 28 ++++++++-----
 5 files changed, 57 insertions(+), 31 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
index 99bde0ed3..edad936e7 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
@@ -17,11 +17,8 @@
 
 package org.apache.inlong.tubemq.corebase.daemon;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,26 +30,45 @@ public abstract class AbstractDaemonService implements Service, Runnable {
     private final Thread daemon;
     private final AtomicBoolean shutdown =
             new AtomicBoolean(false);
-    private final ScheduledExecutorService processorExecutor;
 
     public AbstractDaemonService(final String serviceName, final long intervalMs) {
         this.name = serviceName;
         this.intervalMs = intervalMs;
         this.daemon = new Thread(this, serviceName + "-daemon-thread");
         this.daemon.setDaemon(true);
-        this.processorExecutor = Executors
-                .newSingleThreadScheduledExecutor(
-                        new DefaultThreadFactory("tubemq-core-loop-processor"));
     }
 
     @Override
     public void run() {
-        logger.info(new StringBuilder(256).append(name)
+        StringBuilder strBuff =
+                new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+        logger.info(strBuff.append(name)
                 .append("-daemon-thread started").toString());
-        processorExecutor.schedule(this::loopProcess, intervalMs, TimeUnit.MILLISECONDS);
+        strBuff.delete(0, strBuff.length());
+        // process daemon task
+        while (!isStopped()) {
+            try {
+                Thread.sleep(intervalMs);
+                loopProcess(strBuff);
+            } catch (InterruptedException e) {
+                strBuff.delete(0, strBuff.length());
+                logger.warn(strBuff.append(name)
+                        .append("-daemon-thread thread has been interrupted").toString());
+                strBuff.delete(0, strBuff.length());
+                return;
+            } catch (Throwable t) {
+                strBuff.delete(0, strBuff.length());
+                logger.error(strBuff.append(name)
+                        .append("-daemon-thread throw a exception").toString(), t);
+                strBuff.delete(0, strBuff.length());
+            }
+        }
+        logger.info(strBuff.append(name)
+                .append("-daemon-thread stopped").toString());
+        strBuff.delete(0, strBuff.length());
     }
 
-    protected abstract void loopProcess();
+    protected abstract void loopProcess(StringBuilder strBuff);
 
     @Override
     public void start() {
@@ -72,7 +88,14 @@ public abstract class AbstractDaemonService implements Service, Runnable {
         if (this.shutdown.compareAndSet(false, true)) {
             logger.info(new StringBuilder(256).append(name)
                     .append("-daemon-thread closing ......").toString());
-            this.processorExecutor.shutdown();
+            try {
+                if (this.daemon != null) {
+                    this.daemon.interrupt();
+                    this.daemon.join();
+                }
+            } catch (Throwable e) {
+                //
+            }
             logger.info(new StringBuilder(256).append(name)
                     .append("-daemon-thread stopped!").toString());
             return false;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index 3b46460aa..cb467528c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -65,7 +65,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     }
 
     @Override
-    protected void loopProcess() {
+    protected void loopProcess(StringBuilder strBuff) {
         try {
             commitCfmOffsets(false);
         } catch (Throwable t) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 5d3d8aaba..2f55d4f14 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -53,9 +53,8 @@ public class OffsetRecordService extends AbstractDaemonService {
     }
 
     @Override
-    protected void loopProcess() {
+    protected void loopProcess(StringBuilder strBuff) {
         try {
-            StringBuilder strBuff = new StringBuilder(2048);
             // get group offset information
             storeRecord2LocalTopic(strBuff);
         } catch (Throwable throwable) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index fded7ef0d..f308e637e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -67,12 +67,10 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
     }
 
     @Override
-    protected void loopProcess() {
+    protected void loopProcess(StringBuilder strBuff) {
         try {
-            // Snapshot metric data
-            int befIndex = writableIndex.getAndIncrement();
-            // Output 2 file
-            output2file(befIndex);
+            // Snapshot metric data and output data to file
+            output2file(writableIndex.getAndIncrement());
         } catch (Throwable throwable) {
             logger.error("[Traffic Stats] Daemon commit thread throw error ", throwable);
         }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
index 6ab91635c..afa95194c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.tubemq.server.master.utils;
 
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.TokenConstants;
 import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
 import org.apache.inlong.tubemq.server.master.MasterConfig;
@@ -31,16 +32,12 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
     private final AtomicLong validVisitAuthorized = new AtomicLong(0);
     private final AtomicLong freshVisitAuthorized = new AtomicLong(0);
     private String brokerVisitTokens = "";
-    private StringBuilder strBuilder = new StringBuilder(256);
 
     public SimpleVisitTokenManager(final MasterConfig masterConfig) {
         super("[VisitToken Manager]", (masterConfig.getVisitTokenValidPeriodMs() * 4) / 5);
         this.masterConfig = masterConfig;
-        freshVisitAuthorized.set(System.currentTimeMillis());
-        validVisitAuthorized.set(freshVisitAuthorized.get());
-        brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
-            .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
-        strBuilder.delete(0, strBuilder.length());
+        buildVisitTokens(true,
+                new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE));
         super.start();
     }
 
@@ -57,12 +54,9 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
     }
 
     @Override
-    protected void loopProcess() {
+    protected void loopProcess(StringBuilder strBuff) {
         try {
-            validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
-            brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
-                    .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
-            strBuilder.delete(0, strBuilder.length());
+            buildVisitTokens(false, strBuff);
         }  catch (Throwable t) {
             logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
         }
@@ -75,4 +69,16 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
         logger.info("[VisitToken Manager] VisitToken Manager service stopped!");
     }
 
+    private void buildVisitTokens(boolean initial, StringBuilder strBuff) {
+        if (initial) {
+            freshVisitAuthorized.set(System.currentTimeMillis());
+            validVisitAuthorized.set(freshVisitAuthorized.get());
+        } else {
+            validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
+        }
+        brokerVisitTokens = strBuff.append(validVisitAuthorized.get())
+                .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
+        strBuff.delete(0, strBuff.length());
+    }
+
 }