You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/17 13:19:28 UTC
[inlong] branch master updated: [INLONG-5907][TubeMQ] Replace the while-sleep with ScheduledExecutorService for tube server loopProcess (#5908)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 536e8e6c3 [INLONG-5907][TubeMQ] Replace the while-sleep with ScheduledExecutorService for tube server loopProcess (#5908)
536e8e6c3 is described below
commit 536e8e6c3f99a9c68ac26663ef7c74d994b4f3e9
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Sep 17 21:19:23 2022 +0800
[INLONG-5907][TubeMQ] Replace the while-sleep with ScheduledExecutorService for tube server loopProcess (#5908)
---
.../corebase/daemon/AbstractDaemonService.java | 23 +++++++++++-----------
.../server/broker/offset/DefaultOffsetManager.java | 16 +++++----------
.../server/broker/offset/OffsetRecordService.java | 21 +++++++-------------
.../server/broker/stats/TrafficStatsService.java | 22 ++++++++-------------
.../master/utils/SimpleVisitTokenManager.java | 20 +++++++------------
5 files changed, 38 insertions(+), 64 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
index d99006a11..99bde0ed3 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
@@ -17,6 +17,10 @@
package org.apache.inlong.tubemq.corebase.daemon;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,24 +33,26 @@ public abstract class AbstractDaemonService implements Service, Runnable {
private final Thread daemon;
private final AtomicBoolean shutdown =
new AtomicBoolean(false);
+ private final ScheduledExecutorService processorExecutor;
public AbstractDaemonService(final String serviceName, final long intervalMs) {
this.name = serviceName;
this.intervalMs = intervalMs;
this.daemon = new Thread(this, serviceName + "-daemon-thread");
this.daemon.setDaemon(true);
+ this.processorExecutor = Executors
+ .newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("tubemq-core-loop-processor"));
}
@Override
public void run() {
logger.info(new StringBuilder(256).append(name)
.append("-daemon-thread started").toString());
- this.loopProcess(this.intervalMs);
- logger.info(new StringBuilder(256).append(name)
- .append("-daemon-thread stopped").toString());
+ processorExecutor.schedule(this::loopProcess, intervalMs, TimeUnit.MILLISECONDS);
}
- protected abstract void loopProcess(long intervalMs);
+ protected abstract void loopProcess();
@Override
public void start() {
@@ -66,14 +72,7 @@ public abstract class AbstractDaemonService implements Service, Runnable {
if (this.shutdown.compareAndSet(false, true)) {
logger.info(new StringBuilder(256).append(name)
.append("-daemon-thread closing ......").toString());
- try {
- if (this.daemon != null) {
- this.daemon.interrupt();
- this.daemon.join();
- }
- } catch (Throwable e) {
- //
- }
+ this.processorExecutor.shutdown();
logger.info(new StringBuilder(256).append(name)
.append("-daemon-thread stopped!").toString());
return false;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index d48936801..3b46460aa 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -65,17 +65,11 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
}
@Override
- protected void loopProcess(long intervalMs) {
- while (!super.isStopped()) {
- try {
- Thread.sleep(intervalMs);
- commitCfmOffsets(false);
- } catch (InterruptedException e) {
- logger.warn("[Offset Manager] Daemon commit thread has been interrupted");
- return;
- } catch (Throwable t) {
- logger.error("[Offset Manager] Daemon commit thread throw error ", t);
- }
+ protected void loopProcess() {
+ try {
+ commitCfmOffsets(false);
+ } catch (Throwable t) {
+ logger.error("[Offset Manager] Daemon commit thread throw error ", t);
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 8d65ba8cb..5d3d8aaba 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -53,21 +53,14 @@ public class OffsetRecordService extends AbstractDaemonService {
}
@Override
- protected void loopProcess(long intervalMs) {
- StringBuilder strBuff = new StringBuilder(2048);
- logger.info("[Offset-Record Service] start offset-record service thread");
- while (!super.isStopped()) {
- try {
- Thread.sleep(intervalMs);
- // get group offset information
- storeRecord2LocalTopic(strBuff);
- } catch (InterruptedException e) {
- return;
- } catch (Throwable t) {
- //
- }
+ protected void loopProcess() {
+ try {
+ StringBuilder strBuff = new StringBuilder(2048);
+ // get group offset information
+ storeRecord2LocalTopic(strBuff);
+ } catch (Throwable throwable) {
+ logger.error("[Offset Record] Daemon commit thread throw error ", throwable);
}
- logger.info("[Offset-Record Service] exit offset-record service thread");
}
public void close() {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index d7b43aa97..fded7ef0d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -67,20 +67,14 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
}
@Override
- protected void loopProcess(long intervalMs) {
- int befIndex;
- while (!super.isStopped()) {
- try {
- Thread.sleep(intervalMs);
- // Snapshot metric data
- befIndex = writableIndex.getAndIncrement();
- // Output 2 file
- output2file(befIndex);
- } catch (InterruptedException e) {
- return;
- } catch (Throwable t) {
- //
- }
+ protected void loopProcess() {
+ try {
+ // Snapshot metric data
+ int befIndex = writableIndex.getAndIncrement();
+ // Output 2 file
+ output2file(befIndex);
+ } catch (Throwable throwable) {
+ logger.error("[Traffic Stats] Daemon commit thread throw error ", throwable);
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
index cec3cf875..6ab91635c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
@@ -57,20 +57,14 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
}
@Override
- protected void loopProcess(long intervalMs) {
- while (!super.isStopped()) {
- try {
- Thread.sleep(intervalMs);
- validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
- brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
+ protected void loopProcess() {
+ try {
+ validVisitAuthorized.set(freshVisitAuthorized.getAndSet(System.currentTimeMillis()));
+ brokerVisitTokens = strBuilder.append(validVisitAuthorized.get())
.append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
- strBuilder.delete(0, strBuilder.length());
- } catch (InterruptedException e) {
- logger.warn("[VisitToken Manager] Daemon generator thread has been interrupted");
- return;
- } catch (Throwable t) {
- logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
- }
+ strBuilder.delete(0, strBuilder.length());
+ } catch (Throwable t) {
+ logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
}
}