You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:50:57 UTC
[rocketmq] 02/18: Create ConcurrentReputMessageService and replace native thread with ServiceThread
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b92a29e135c1a1b58351761d9621d5c3524f1ec7
Author: nowinkey <no...@tom.com>
AuthorDate: Fri Jan 20 02:03:24 2023 +0800
Create ConcurrentReputMessageService and replace native thread with ServiceThread
---
.../org/apache/rocketmq/common/BrokerConfig.java | 10 +
.../apache/rocketmq/store/DefaultMessageStore.java | 409 +++++++++++++--------
2 files changed, 259 insertions(+), 160 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 23307ab03..9bf615f61 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -394,6 +394,8 @@ public class BrokerConfig extends BrokerIdentity {
private long channelExpiredTimeout = 1000 * 120;
private long subscriptionExpiredTimeout = 1000 * 60 * 10;
+ private int batchDispatchRequestThreadPoolNums = 16;
+
/**
* Estimate accumulation or not when subscription filter type is tag and is not SUB_ALL.
*/
@@ -1646,4 +1648,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setEstimateAccumulation(boolean estimateAccumulation) {
this.estimateAccumulation = estimateAccumulation;
}
+
+ public int getBatchDispatchRequestThreadPoolNums() {
+ return batchDispatchRequestThreadPoolNums;
+ }
+
+ public void setBatchDispatchRequestThreadPoolNums(int batchDispatchRequestThreadPoolNums) {
+ this.batchDispatchRequestThreadPoolNums = batchDispatchRequestThreadPoolNums;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index a4af44222..10f54a36f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -42,6 +42,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;
@@ -134,6 +135,10 @@ public class DefaultMessageStore implements MessageStore {
private ReputMessageService reputMessageService;
+ private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+ private DispatchService dispatchService;
+
private HAService haService;
// CompactionLog
@@ -189,6 +194,14 @@ public class DefaultMessageStore implements MessageStore {
private int maxDelayLevel;
+ private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+ private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+ private int dispatchRequestOrderlyQueueSize = 16;
+
+ private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
@@ -225,7 +238,13 @@ public class DefaultMessageStore implements MessageStore {
}
}
- this.reputMessageService = new ReputMessageService();
+ if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+ this.reputMessageService = new ReputMessageService();
+ } else {
+ this.reputMessageService = new ConcurrentReputMessageService();
+ this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+ this.dispatchService = new DispatchService();
+ }
this.transientStorePool = new TransientStorePool(this);
@@ -361,6 +380,11 @@ public class DefaultMessageStore implements MessageStore {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
this.reputMessageService.start();
+ if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+ this.mainBatchDispatchRequestService.start();
+ this.dispatchService.start();
+ }
+
// Checking is not necessary, as long as the dLedger's implementation exactly follows the definition of Recover,
// which is eliminating the dispatch inconsistency between the commitLog and consumeQueue at the end of recovery.
this.doRecheckReputOffsetFromCq();
@@ -458,6 +482,12 @@ public class DefaultMessageStore implements MessageStore {
}
this.commitLog.shutdown();
this.reputMessageService.shutdown();
+ if (mainBatchDispatchRequestService != null) {
+ mainBatchDispatchRequestService.shutdown();
+ }
+ if (dispatchService != null) {
+ dispatchService.shutdown();
+ }
this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
@@ -675,7 +705,12 @@ public class DefaultMessageStore implements MessageStore {
this.recoverTopicQueueTable();
- this.reputMessageService = new ReputMessageService();
+ if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+ this.reputMessageService = new ReputMessageService();
+ } else {
+ this.reputMessageService = new ConcurrentReputMessageService();
+ }
+
this.reputMessageService.setReputFromOffset(Math.min(oldReputFromOffset, offsetToTruncate));
this.reputMessageService.start();
}
@@ -2646,110 +2681,7 @@ public class DefaultMessageStore implements MessageStore {
class ReputMessageService extends ServiceThread {
- private volatile long reputFromOffset = 0;
-
- private int batchId = 0;
-
- private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
-
- private static final int BATCH_SIZE = 1024 * 1024 * 4;
-
- private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
-
- private int dispatchRequestOrderlyQueueSize = 16;
-
- private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
-
- private int batchDispatchRequestThreadPoolNums = 16;
-
- private ExecutorService batchDispatchRequestExecutor;
-
- public ReputMessageService() {
- if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
- initExecutorService();
- startBatchDispatchRequestService();
- }
- }
-
- private void initExecutorService() {
- batchDispatchRequestExecutor = new ThreadPoolExecutor(
- this.batchDispatchRequestThreadPoolNums,
- this.batchDispatchRequestThreadPoolNums,
- 1000 * 60,
- TimeUnit.MICROSECONDS,
- new LinkedBlockingDeque<>(),
- new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
- }
-
- private void startBatchDispatchRequestService() {
- new Thread(() -> {
- while (true) {
- if (!batchDispatchRequestQueue.isEmpty()) {
- BatchDispatchRequest task = batchDispatchRequestQueue.poll();
- batchDispatchRequestExecutor.execute(() -> {
- ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
- tmpByteBuffer.position(task.position);
- tmpByteBuffer.limit(task.position + task.size);
- List<DispatchRequest> dispatchRequestList = new ArrayList<>();
- while (tmpByteBuffer.hasRemaining()) {
- DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
- if (dispatchRequest.isSuccess()) {
- dispatchRequestList.add(dispatchRequest);
- } else {
- LOGGER.error("[BUG]read total count not equals msg total size.");
- }
- }
- this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
- mappedPageHoldCount.getAndDecrement();
- });
- } else {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }, "MainBatchDispatchRequestServiceThread").start();
-
- new Thread(() -> {
- List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
- while (true) {
- dispatchRequestsList.clear();
- dispatchRequestOrderlyQueue.get(dispatchRequestsList);
- if (!dispatchRequestsList.isEmpty()) {
- for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
- for (DispatchRequest dispatchRequest : dispatchRequests) {
- DefaultMessageStore.this.doDispatch(dispatchRequest);
- // wake up long-polling
- if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
- && DefaultMessageStore.this.messageArrivingListener != null) {
- DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
- dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
- dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
- dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
- notifyMessageArrive4MultiQueue(dispatchRequest);
- }
- if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
- DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
- DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
- DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
- .add(dispatchRequest.getMsgSize());
- }
- }
- }
- } else {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }, "DispatchServiceThread").start();
- }
+ public volatile long reputFromOffset = 0;
public long getReputFromOffset() {
return reputFromOffset;
@@ -2781,14 +2713,14 @@ public class DefaultMessageStore implements MessageStore {
return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset;
}
- private boolean isCommitLogAvailable() {
+ public boolean isCommitLogAvailable() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) {
return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
}
return this.reputFromOffset < DefaultMessageStore.this.getConfirmOffset();
}
- private void doReput() {
+ public void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
@@ -2865,7 +2797,187 @@ public class DefaultMessageStore implements MessageStore {
}
}
- private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+ private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) {
+ Map<String, String> prop = dispatchRequest.getPropertiesMap();
+ if (prop == null) {
+ return;
+ }
+ String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+ if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) {
+ return;
+ }
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ if (queues.length != queueOffsets.length) {
+ return;
+ }
+ for (int i = 0; i < queues.length; i++) {
+ String queueName = queues[i];
+ long queueOffset = Long.parseLong(queueOffsets[i]);
+ int queueId = dispatchRequest.getQueueId();
+ if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+ queueId = 0;
+ }
+ DefaultMessageStore.this.messageArrivingListener.arriving(
+ queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(),
+ dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+ }
+ }
+
+ @Override
+ public void run() {
+ DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ this.doReput();
+ } catch (Exception e) {
+ DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+ return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ReputMessageService.class.getSimpleName();
+ }
+ return ReputMessageService.class.getSimpleName();
+ }
+
+ }
+
+ class MainBatchDispatchRequestService extends ServiceThread {
+
+ private final ExecutorService batchDispatchRequestExecutor;
+
+ public MainBatchDispatchRequestService() {
+ batchDispatchRequestExecutor = new ThreadPoolExecutor(
+ DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+ DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MICROSECONDS,
+ new LinkedBlockingQueue<>(1024),
+ new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+ }
+
+ private void pollBatchDispatchRequest() {
+ if (!batchDispatchRequestQueue.isEmpty()) {
+ BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+ batchDispatchRequestExecutor.execute(() -> {
+ ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+ tmpByteBuffer.position(task.position);
+ tmpByteBuffer.limit(task.position + task.size);
+ List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+ while (tmpByteBuffer.hasRemaining()) {
+ DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+ if (dispatchRequest.isSuccess()) {
+ dispatchRequestList.add(dispatchRequest);
+ } else {
+ LOGGER.error("[BUG]read total count not equals msg total size.");
+ }
+ }
+ dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+ mappedPageHoldCount.getAndDecrement();
+ });
+ }
+ }
+
+ @Override
+ public void run() {
+ DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ pollBatchDispatchRequest();
+ } catch (Exception e) {
+ DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+ return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+ }
+ return MainBatchDispatchRequestService.class.getSimpleName();
+ }
+
+ }
+
+ class DispatchService extends ServiceThread {
+
+ private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+ private void dispatch() {
+ dispatchRequestsList.clear();
+ dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+ if (!dispatchRequestsList.isEmpty()) {
+ for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+ for (DispatchRequest dispatchRequest : dispatchRequests) {
+ DefaultMessageStore.this.doDispatch(dispatchRequest);
+ // wake up long-polling
+ if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+ && DefaultMessageStore.this.messageArrivingListener != null) {
+ DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+ dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+ dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+ dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+ DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+ }
+ if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+ DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+ DefaultMessageStore.this.storeStatsService
+ .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+ DefaultMessageStore.this.storeStatsService
+ .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+ .add(dispatchRequest.getMsgSize());
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ dispatch();
+ } catch (Exception e) {
+ DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+ return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+ }
+ return DispatchService.class.getSimpleName();
+ }
+ }
+
+ class ConcurrentReputMessageService extends ReputMessageService {
+
+ private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+ private int batchId = 0;
+
+ public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
if (position < 0) {
return;
}
@@ -2874,7 +2986,8 @@ public class DefaultMessageStore implements MessageStore {
batchDispatchRequestQueue.offer(task);
}
- private void doReputConcurrently() {
+ @Override
+ public void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
@@ -2896,6 +3009,8 @@ public class DefaultMessageStore implements MessageStore {
for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
ByteBuffer byteBuffer = result.getByteBuffer();
+ byteBuffer.mark();
+
int totalSize = byteBuffer.getInt();
if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
doNext = false;
@@ -2914,9 +3029,11 @@ public class DefaultMessageStore implements MessageStore {
doNext = false;
}
+ byteBuffer.reset();
+
if (totalSize > 0) {
if (batchDispatchRequestStart == -1) {
- batchDispatchRequestStart = byteBuffer.position() - 8;
+ batchDispatchRequestStart = byteBuffer.position();
batchDispatchRequestSize = 0;
}
batchDispatchRequestSize += totalSize;
@@ -2925,7 +3042,7 @@ public class DefaultMessageStore implements MessageStore {
batchDispatchRequestStart = -1;
batchDispatchRequestSize = -1;
}
- byteBuffer.position(byteBuffer.position() + totalSize - 8);
+ byteBuffer.position(byteBuffer.position() + totalSize);
this.reputFromOffset += totalSize;
readSize += totalSize;
} else {
@@ -2938,80 +3055,52 @@ public class DefaultMessageStore implements MessageStore {
batchDispatchRequestSize = -1;
}
}
- } catch (Throwable e) {
- throw e;
} finally {
this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize);
- boolean over = this.mappedPageHoldCount.get() == 0;
+ boolean over = mappedPageHoldCount.get() == 0;
while (!over) {
try {
- Thread.sleep(1);
+ TimeUnit.MILLISECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
- over = this.mappedPageHoldCount.get() == 0;
+ over = mappedPageHoldCount.get() == 0;
}
result.release();
}
}
}
- private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) {
- Map<String, String> prop = dispatchRequest.getPropertiesMap();
- if (prop == null) {
- return;
- }
- String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
- if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) {
- return;
- }
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- if (queues.length != queueOffsets.length) {
- return;
- }
- for (int i = 0; i < queues.length; i++) {
- String queueName = queues[i];
- long queueOffset = Long.parseLong(queueOffsets[i]);
- int queueId = dispatchRequest.getQueueId();
- if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
- queueId = 0;
+ @Override
+ public void shutdown() {
+ for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException ignored) {
}
- DefaultMessageStore.this.messageArrivingListener.arriving(
- queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(),
- dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
+
+ if (this.isCommitLogAvailable()) {
+ LOGGER.warn("shutdown concurrentReputMessageService, but CommitLog have not finish to be dispatched, CommitLog max" +
+ " offset={}, reputFromOffset={}", DefaultMessageStore.this.commitLog.getMaxOffset(),
+ this.reputFromOffset);
+ }
+
+ this.shutdown();
}
@Override
public void run() {
- DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- try {
- Thread.sleep(1);
- if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
- this.doReput();
- } else {
- doReputConcurrently();
- }
- } catch (Exception e) {
- DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+ super.run();
}
@Override
public String getServiceName() {
if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
- return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ReputMessageService.class.getSimpleName();
+ return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ConcurrentReputMessageService.class.getSimpleName();
}
- return ReputMessageService.class.getSimpleName();
+ return ConcurrentReputMessageService.class.getSimpleName();
}
-
}
@Override