You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/20 12:41:17 UTC
[inlong] branch master updated: [INLONG-5946][TubeMQ] AbstractDaemonService implementation optimization (#5947)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cddfda3d4 [INLONG-5946][TubeMQ] AbstractDaemonService implementation optimization (#5947)
cddfda3d4 is described below
commit cddfda3d40e0c1667f808af4cd44b0bad5f0e745
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Sep 20 20:41:12 2022 +0800
[INLONG-5946][TubeMQ] AbstractDaemonService implementation optimization (#5947)
---
.../corebase/daemon/AbstractDaemonService.java | 47 ++++++++++++++++------
.../server/broker/offset/DefaultOffsetManager.java | 2 +-
.../server/broker/offset/OffsetRecordService.java | 3 +-
.../server/broker/stats/TrafficStatsService.java | 8 ++--
.../master/utils/SimpleVisitTokenManager.java | 28 ++++++++-----
5 files changed, 57 insertions(+), 31 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
index 99bde0ed3..edad936e7 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
@@ -17,11 +17,8 @@
package org.apache.inlong.tubemq.corebase.daemon;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,26 +30,45 @@ public abstract class AbstractDaemonService implements Service, Runnable {
private final Thread daemon;
private final AtomicBoolean shutdown =
new AtomicBoolean(false);
- private final ScheduledExecutorService processorExecutor;
public AbstractDaemonService(final String serviceName, final long intervalMs) {
this.name = serviceName;
this.intervalMs = intervalMs;
this.daemon = new Thread(this, serviceName + "-daemon-thread");
this.daemon.setDaemon(true);
- this.processorExecutor = Executors
- .newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("tubemq-core-loop-processor"));
}
@Override
public void run() {
- logger.info(new StringBuilder(256).append(name)
+ StringBuilder strBuff =
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+ logger.info(strBuff.append(name)
.append("-daemon-thread started").toString());
- processorExecutor.schedule(this::loopProcess, intervalMs, TimeUnit.MILLISECONDS);
+ strBuff.delete(0, strBuff.length());
+ // process daemon task
+ while (!isStopped()) {
+ try {
+ Thread.sleep(intervalMs);
+ loopProcess(strBuff);
+ } catch (InterruptedException e) {
+ strBuff.delete(0, strBuff.length());
+ logger.warn(strBuff.append(name)
+ .append("-daemon-thread thread has been interrupted").toString());
+ strBuff.delete(0, strBuff.length());
+ return;
+ } catch (Throwable t) {
+ strBuff.delete(0, strBuff.length());
+ logger.error(strBuff.append(name)
+ .append("-daemon-thread throw a exception").toString(), t);
+ strBuff.delete(0, strBuff.length());
+ }
+ }
+ logger.info(strBuff.append(name)
+ .append("-daemon-thread stopped").toString());
+ strBuff.delete(0, strBuff.length());
}
- protected abstract void loopProcess();
+ protected abstract void loopProcess(StringBuilder strBuff);
@Override
public void start() {
@@ -72,7 +88,14 @@ public abstract class AbstractDaemonService implements Service, Runnable {
if (this.shutdown.compareAndSet(false, true)) {
logger.info(new StringBuilder(256).append(name)
.append("-daemon-thread closing ......").toString());
- this.processorExecutor.shutdown();
+ try {
+ if (this.daemon != null) {
+ this.daemon.interrupt();
+ this.daemon.join();
+ }
+ } catch (Throwable e) {
+ //
+ }
logger.info(new StringBuilder(256).append(name)
.append("-daemon-thread stopped!").toString());
return false;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index 3b46460aa..cb467528c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -65,7 +65,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
}
@Override
- protected void loopProcess() {
+ protected void loopProcess(StringBuilder strBuff) {
try {
commitCfmOffsets(false);
} catch (Throwable t) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 5d3d8aaba..2f55d4f14 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -53,9 +53,8 @@ public class OffsetRecordService extends AbstractDaemonService {
}
@Override
- protected void loopProcess() {
+ protected void loopProcess(StringBuilder strBuff) {
try {
- StringBuilder strBuff = new StringBuilder(2048);
// get group offset information
storeRecord2LocalTopic(strBuff);
} catch (Throwable throwable) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index fded7ef0d..f308e637e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -67,12 +67,10 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
}
@Override
- protected void loopProcess() {
+ protected void loopProcess(StringBuilder strBuff) {
try {
- // Snapshot metric data
- int befIndex = writableIndex.getAndIncrement();
- // Output 2 file
- output2file(befIndex);
+ // Snapshot metric data and output data to file
+ output2file(writableIndex.getAndIncrement());
} catch (Throwable throwable) {
logger.error("[Traffic Stats] Daemon commit thread throw error ", throwable);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
index 6ab91635c..afa95194c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
@@ -18,6 +18,7 @@
package org.apache.inlong.tubemq.server.master.utils;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
import org.apache.inlong.tubemq.server.master.MasterConfig;
@@ -31,16 +32,12 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
private final AtomicLong validVisitAuthorized = new AtomicLong(0);
private final AtomicLong freshVisitAuthorized = new AtomicLong(0);
private String brokerVisitTokens = "";
- private StringBuilder strBuilder = new StringBuilder(256);
public SimpleVisitTokenManager(final MasterConfig masterConfig) {
super("[VisitToken Manager]", (masterConfig.getVisitTokenValidPeriodMs() * 4) / 5);
this.masterConfig = masterConfig;
- freshVisitAuthorized.set(System.currentTimeMillis());
- validVisitAuthorized.set(freshVisitAuthorized.get());
- brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
- .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
- strBuilder.delete(0, strBuilder.length());
+ buildVisitTokens(true,
+ new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE));
super.start();
}
@@ -57,12 +54,9 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
}
@Override
- protected void loopProcess() {
+ protected void loopProcess(StringBuilder strBuff) {
try {
- validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
- brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
- .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
- strBuilder.delete(0, strBuilder.length());
+ buildVisitTokens(false, strBuff);
} catch (Throwable t) {
logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
}
@@ -75,4 +69,16 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
logger.info("[VisitToken Manager] VisitToken Manager service stopped!");
}
+ private void buildVisitTokens(boolean initial, StringBuilder strBuff) {
+ if (initial) {
+ freshVisitAuthorized.set(System.currentTimeMillis());
+ validVisitAuthorized.set(freshVisitAuthorized.get());
+ } else {
+ validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
+ }
+ brokerVisitTokens = strBuff.append(validVisitAuthorized.get())
+ .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
+ strBuff.delete(0, strBuff.length());
+ }
+
}