You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:50:57 UTC

[rocketmq] 02/18: Create ConcurrentReputMessageService and replace native thread with ServiceThread

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit b92a29e135c1a1b58351761d9621d5c3524f1ec7
Author: nowinkey <no...@tom.com>
AuthorDate: Fri Jan 20 02:03:24 2023 +0800

    Create ConcurrentReputMessageService and replace native thread with ServiceThread
---
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../apache/rocketmq/store/DefaultMessageStore.java | 409 +++++++++++++--------
 2 files changed, 259 insertions(+), 160 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 23307ab03..9bf615f61 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -394,6 +394,8 @@ public class BrokerConfig extends BrokerIdentity {
     private long channelExpiredTimeout = 1000 * 120;
     private long subscriptionExpiredTimeout = 1000 * 60 * 10;
 
+    private int batchDispatchRequestThreadPoolNums = 16;
+
     /**
      * Estimate accumulation or not when subscription filter type is tag and is not SUB_ALL.
      */
@@ -1646,4 +1648,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setEstimateAccumulation(boolean estimateAccumulation) {
         this.estimateAccumulation = estimateAccumulation;
     }
+
+    public int getBatchDispatchRequestThreadPoolNums() {
+        return batchDispatchRequestThreadPoolNums;
+    }
+
+    public void setBatchDispatchRequestThreadPoolNums(int batchDispatchRequestThreadPoolNums) {
+        this.batchDispatchRequestThreadPoolNums = batchDispatchRequestThreadPoolNums;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index a4af44222..10f54a36f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutionException;
@@ -134,6 +135,10 @@ public class DefaultMessageStore implements MessageStore {
 
     private ReputMessageService reputMessageService;
 
+    private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+    private DispatchService dispatchService;
+
     private HAService haService;
 
     // CompactionLog
@@ -189,6 +194,14 @@ public class DefaultMessageStore implements MessageStore {
 
     private int maxDelayLevel;
 
+    private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+    private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+    private int dispatchRequestOrderlyQueueSize = 16;
+
+    private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
@@ -225,7 +238,13 @@ public class DefaultMessageStore implements MessageStore {
             }
         }
 
-        this.reputMessageService = new ReputMessageService();
+        if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+            this.reputMessageService = new ReputMessageService();
+        } else {
+            this.reputMessageService = new ConcurrentReputMessageService();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
 
         this.transientStorePool = new TransientStorePool(this);
 
@@ -361,6 +380,11 @@ public class DefaultMessageStore implements MessageStore {
         this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
         this.reputMessageService.start();
 
+        if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
         // Checking is not necessary, as long as the dLedger's implementation exactly follows the definition of Recover,
         // which is eliminating the dispatch inconsistency between the commitLog and consumeQueue at the end of recovery.
         this.doRecheckReputOffsetFromCq();
@@ -458,6 +482,12 @@ public class DefaultMessageStore implements MessageStore {
             }
             this.commitLog.shutdown();
             this.reputMessageService.shutdown();
+            if (mainBatchDispatchRequestService != null) {
+                mainBatchDispatchRequestService.shutdown();
+            }
+            if (dispatchService != null) {
+                dispatchService.shutdown();
+            }
             this.flushConsumeQueueService.shutdown();
             this.allocateMappedFileService.shutdown();
             this.storeCheckpoint.flush();
@@ -675,7 +705,12 @@ public class DefaultMessageStore implements MessageStore {
 
         this.recoverTopicQueueTable();
 
-        this.reputMessageService = new ReputMessageService();
+        if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+            this.reputMessageService = new ReputMessageService();
+        } else {
+            this.reputMessageService = new ConcurrentReputMessageService();
+        }
+
         this.reputMessageService.setReputFromOffset(Math.min(oldReputFromOffset, offsetToTruncate));
         this.reputMessageService.start();
     }
@@ -2646,110 +2681,7 @@ public class DefaultMessageStore implements MessageStore {
 
     class ReputMessageService extends ServiceThread {
 
-        private volatile long reputFromOffset = 0;
-
-        private int batchId = 0;
-
-        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
-
-        private static final int BATCH_SIZE = 1024 * 1024 * 4;
-
-        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
-
-        private int dispatchRequestOrderlyQueueSize = 16;
-
-        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
-
-        private int batchDispatchRequestThreadPoolNums = 16;
-
-        private ExecutorService batchDispatchRequestExecutor;
-
-        public ReputMessageService() {
-            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
-                initExecutorService();
-                startBatchDispatchRequestService();
-            }
-        }
-
-        private void initExecutorService() {
-            batchDispatchRequestExecutor = new ThreadPoolExecutor(
-                    this.batchDispatchRequestThreadPoolNums,
-                    this.batchDispatchRequestThreadPoolNums,
-                    1000 * 60,
-                    TimeUnit.MICROSECONDS,
-                    new LinkedBlockingDeque<>(),
-                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
-        }
-
-        private void startBatchDispatchRequestService() {
-            new Thread(() -> {
-                while (true) {
-                    if (!batchDispatchRequestQueue.isEmpty()) {
-                        BatchDispatchRequest task = batchDispatchRequestQueue.poll();
-                        batchDispatchRequestExecutor.execute(() -> {
-                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
-                            tmpByteBuffer.position(task.position);
-                            tmpByteBuffer.limit(task.position + task.size);
-                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
-                            while (tmpByteBuffer.hasRemaining()) {
-                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
-                                if (dispatchRequest.isSuccess()) {
-                                    dispatchRequestList.add(dispatchRequest);
-                                } else {
-                                    LOGGER.error("[BUG]read total count not equals msg total size.");
-                                }
-                            }
-                            this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
-                            mappedPageHoldCount.getAndDecrement();
-                        });
-                    } else {
-                        try {
-                            Thread.sleep(1);
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
-            }, "MainBatchDispatchRequestServiceThread").start();
-
-            new Thread(() -> {
-                List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
-                while (true) {
-                    dispatchRequestsList.clear();
-                    dispatchRequestOrderlyQueue.get(dispatchRequestsList);
-                    if (!dispatchRequestsList.isEmpty()) {
-                        for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
-                            for (DispatchRequest dispatchRequest : dispatchRequests) {
-                                DefaultMessageStore.this.doDispatch(dispatchRequest);
-                                // wake up long-polling
-                                if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
-                                        && DefaultMessageStore.this.messageArrivingListener != null) {
-                                    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
-                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
-                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
-                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
-                                    notifyMessageArrive4MultiQueue(dispatchRequest);
-                                }
-                                if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
-                                        DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
-                                    DefaultMessageStore.this.storeStatsService
-                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
-                                    DefaultMessageStore.this.storeStatsService
-                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
-                                            .add(dispatchRequest.getMsgSize());
-                                }
-                            }
-                        }
-                    } else {
-                        try {
-                            Thread.sleep(1);
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
-            }, "DispatchServiceThread").start();
-        }
+        public volatile long reputFromOffset = 0;
 
         public long getReputFromOffset() {
             return reputFromOffset;
@@ -2781,14 +2713,14 @@ public class DefaultMessageStore implements MessageStore {
             return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset;
         }
 
-        private boolean isCommitLogAvailable() {
+        public boolean isCommitLogAvailable() {
             if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) {
                 return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
             }
             return this.reputFromOffset < DefaultMessageStore.this.getConfirmOffset();
         }
 
-        private void doReput() {
+        public void doReput() {
             if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                 LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                     this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
@@ -2865,7 +2797,187 @@ public class DefaultMessageStore implements MessageStore {
             }
         }
 
-        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+        private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) {
+            Map<String, String> prop = dispatchRequest.getPropertiesMap();
+            if (prop == null) {
+                return;
+            }
+            String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+            String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+            if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) {
+                return;
+            }
+            String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+            String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+            if (queues.length != queueOffsets.length) {
+                return;
+            }
+            for (int i = 0; i < queues.length; i++) {
+                String queueName = queues[i];
+                long queueOffset = Long.parseLong(queueOffsets[i]);
+                int queueId = dispatchRequest.getQueueId();
+                if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+                    queueId = 0;
+                }
+                DefaultMessageStore.this.messageArrivingListener.arriving(
+                    queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(),
+                    dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    this.doReput();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ReputMessageService.class.getSimpleName();
+            }
+            return ReputMessageService.class.getSimpleName();
+        }
+
+    }
+
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                    tmpByteBuffer.position(task.position);
+                    tmpByteBuffer.limit(task.position + task.size);
+                    List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                    while (tmpByteBuffer.hasRemaining()) {
+                        DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                        if (dispatchRequest.isSuccess()) {
+                            dispatchRequestList.add(dispatchRequest);
+                        } else {
+                            LOGGER.error("[BUG]read total count not equals msg total size.");
+                        }
+                    }
+                    dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                    mappedPageHoldCount.getAndDecrement();
+                });
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private int batchId = 0;
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
             if (position < 0) {
                 return;
             }
@@ -2874,7 +2986,8 @@ public class DefaultMessageStore implements MessageStore {
             batchDispatchRequestQueue.offer(task);
         }
 
-        private void doReputConcurrently() {
+        @Override
+        public void doReput() {
             if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                 LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                         this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
@@ -2896,6 +3009,8 @@ public class DefaultMessageStore implements MessageStore {
                     for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
                         ByteBuffer byteBuffer = result.getByteBuffer();
 
+                        byteBuffer.mark();
+
                         int totalSize = byteBuffer.getInt();
                         if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
                             doNext = false;
@@ -2914,9 +3029,11 @@ public class DefaultMessageStore implements MessageStore {
                                 doNext = false;
                         }
 
+                        byteBuffer.reset();
+
                         if (totalSize > 0) {
                             if (batchDispatchRequestStart == -1) {
-                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestStart = byteBuffer.position();
                                 batchDispatchRequestSize = 0;
                             }
                             batchDispatchRequestSize += totalSize;
@@ -2925,7 +3042,7 @@ public class DefaultMessageStore implements MessageStore {
                                 batchDispatchRequestStart = -1;
                                 batchDispatchRequestSize = -1;
                             }
-                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            byteBuffer.position(byteBuffer.position() + totalSize);
                             this.reputFromOffset += totalSize;
                             readSize += totalSize;
                         } else {
@@ -2938,80 +3055,52 @@ public class DefaultMessageStore implements MessageStore {
                             batchDispatchRequestSize = -1;
                         }
                     }
-                } catch (Throwable e) {
-                    throw e;
                 } finally {
                     this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize);
-                    boolean over = this.mappedPageHoldCount.get() == 0;
+                    boolean over = mappedPageHoldCount.get() == 0;
                     while (!over) {
                         try {
-                            Thread.sleep(1);
+                            TimeUnit.MILLISECONDS.sleep(1);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
-                        over = this.mappedPageHoldCount.get() == 0;
+                        over = mappedPageHoldCount.get() == 0;
                     }
                     result.release();
                 }
             }
         }
 
-        private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) {
-            Map<String, String> prop = dispatchRequest.getPropertiesMap();
-            if (prop == null) {
-                return;
-            }
-            String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
-            String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
-            if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) {
-                return;
-            }
-            String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
-            String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
-            if (queues.length != queueOffsets.length) {
-                return;
-            }
-            for (int i = 0; i < queues.length; i++) {
-                String queueName = queues[i];
-                long queueOffset = Long.parseLong(queueOffsets[i]);
-                int queueId = dispatchRequest.getQueueId();
-                if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
-                    queueId = 0;
+        @Override
+        public void shutdown() {
+            for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(100);
+                } catch (InterruptedException ignored) {
                 }
-                DefaultMessageStore.this.messageArrivingListener.arriving(
-                    queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(),
-                    dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
             }
+
+            if (this.isCommitLogAvailable()) {
+                LOGGER.warn("shutdown concurrentReputMessageService, but CommitLog have not finish to be dispatched, CommitLog max" +
+                                " offset={}, reputFromOffset={}", DefaultMessageStore.this.commitLog.getMaxOffset(),
+                        this.reputFromOffset);
+            }
+
+            this.shutdown();
         }
 
         @Override
         public void run() {
-            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
-
-            while (!this.isStopped()) {
-                try {
-                    Thread.sleep(1);
-                    if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
-                        this.doReput();
-                    } else {
-                        doReputConcurrently();
-                    }
-                } catch (Exception e) {
-                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
-                }
-            }
-
-            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+            super.run();
         }
 
         @Override
         public String getServiceName() {
             if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
-                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ReputMessageService.class.getSimpleName();
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ConcurrentReputMessageService.class.getSimpleName();
             }
-            return ReputMessageService.class.getSimpleName();
+            return ConcurrentReputMessageService.class.getSimpleName();
         }
-
     }
 
     @Override