You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2023/01/15 17:18:48 UTC

[GitHub] [rocketmq] nowinkeyy opened a new pull request, #5887: [ISSUE #5884] Concurrent check CommitLog messages

nowinkeyy opened a new pull request, #5887:
URL: https://github.com/apache/rocketmq/pull/5887

   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   close #5884 
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1103922354


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();

Review Comment:
   Do you want to submit another commit to combine it into a method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096503352


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   Does it need a policy to temporarily save more than 1024 tasks? I can adjust it if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1097002744


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   Is that ok?
   
   ![image](https://user-images.githubusercontent.com/72536832/216903825-95ec619c-2578-4233-a221-284b3b0af483.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1102676645


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {

Review Comment:
   > The doReput method in ConcurrentReputMessageService is similar with the method in class ReputMessageService. Is it possible to abstract some code fragments to reuse the code?
   
   If I abstracted a method, it would probably require some changes to ReputMessageService, which I don't want to do. And I don't think the reusability requirements for this part of the code are high.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "guyinyou (via GitHub)" <gi...@apache.org>.
guyinyou commented on PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#issuecomment-1421926603

   ![image](https://user-images.githubusercontent.com/36399867/217419869-015cda68-9118-4d69-8418-5910367604fd.png)
   
   After testing, it still meets expectations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104145056


##########
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java:
##########
@@ -392,11 +392,14 @@ public boolean isEnable() {
     private long channelExpiredTimeout = 1000 * 120;
     private long subscriptionExpiredTimeout = 1000 * 60 * 10;
 
+    private int batchDispatchRequestThreadPoolNums = 16;

Review Comment:
   Why not put this config to MessageStoreConfig?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "guyinyou (via GitHub)" <gi...@apache.org>.
guyinyou commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104150310


##########
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java:
##########
@@ -392,11 +392,14 @@ public boolean isEnable() {
     private long channelExpiredTimeout = 1000 * 120;
     private long subscriptionExpiredTimeout = 1000 * 60 * 10;
 
+    private int batchDispatchRequestThreadPoolNums = 16;

Review Comment:
   > Why not put this config to MessageStoreConfig?
   
   a small detail, niubility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096504705


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2610,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;

Review Comment:
   prt maybe long type is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1098223451


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);

Review Comment:
   > I don't think using CallerRunsPolicy works well. Because when run() is executed, the calling thread blocks waiting for a long time. Even after the workQueue is idle, the calling thread continues to block for some time. In the current way, the calling thread simply blocks and waits until the workQueue is idle.
   
   The run () execution process blocks because only requests within a window (default size 16) can be put into the dispatchRequestOrderlyQueue, and requests beyond the window range block waiting for placement. 
   If you use the CallerRunsPolicy strategy, when the work queue is full, the calling thread must be executing run () requests that exceed the window scope, causing the calling thread to wait too long to finish executing run ().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104340762


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   BATCH_SIZE can be assigned from MessageStoreConfig here which that it will only take effect when broker startup.
   
   https://github.com/apache/rocketmq/blob/02577ed7230a3587cb04ba65abdd6870230c4d7e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java#L2970
   
   
   
   



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   BATCH_SIZE can be assigned from MessageStoreConfig here it will only take effect when broker startup.
   
   https://github.com/apache/rocketmq/blob/02577ed7230a3587cb04ba65abdd6870230c4d7e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java#L2970
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1102517850


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {

Review Comment:
   The doReput method in ConcurrentReputMessageService is similar with the method in class ReputMessageService. Is it possible to abstract some code fragments to reuse the code?



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();

Review Comment:
   It's better to have a method that is responsible for building DispatchRequest including byteBuffer decode like checkMessageAndReturnSize.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                    tmpByteBuffer.position(task.position);
+                    tmpByteBuffer.limit(task.position + task.size);
+                    List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                    while (tmpByteBuffer.hasRemaining()) {
+                        DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                        if (dispatchRequest.isSuccess()) {
+                            dispatchRequestList.add(dispatchRequest);
+                        } else {
+                            LOGGER.error("[BUG]read total count not equals msg total size.");
+                        }
+                    }
+                    dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                    mappedPageHoldCount.getAndDecrement();
+                });
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private int batchId = 0;

Review Comment:
   It's recommended to use Long for batchId.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        byteBuffer.mark();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:

Review Comment:
   MESSAGE_MAGIC_CODE_V2 should be put into consideration.
   See https://github.com/apache/rocketmq/blob/aa8ca48d478e58309f9a6b026e6fea7f4772c6e9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L386-L391 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1103922192


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {

Review Comment:
   I would accept it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096505718


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   > 
   
   okk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104029335


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2595,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private long id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, long id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        long ptr = 0;
+
+        AtomicLong maxPtr = new AtomicLong();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(long idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = (int) (idx % this.buffer.length);
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = (int) (ptr % this.buffer.length);
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }

Review Comment:
   Ditto. We should avoid abbreviated variable names like idx, obj or rets.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2595,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private long id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, long id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        long ptr = 0;
+
+        AtomicLong maxPtr = new AtomicLong();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(long idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = (int) (idx % this.buffer.length);
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = (int) (ptr % this.buffer.length);
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }

Review Comment:
   And DispatchRequestOrderlyQueue#get always returns null, which looks weird. Changed to the following form to be more object-oriented.
   
   ```
   public List<DispatchRequest[]> get()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096615759


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   i can't confirm that batchDispatchRequestQueue queue getSize method is concurrent safe
   
   common usage  set right  reject policy of Executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096502996


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   > when runnable tasks greater than 1024, batchDispatchRequestExecutor miss RejectedExecutionHandler logic,the default RejectedExecutionHandler is right?
   
   The reason I set the value to 1024 is that I think this size is sufficient. Because a task size is 4MB. A commitlog file size of 1G can generate 256tasks. So the queue can hold 1024 tasks generated by four commitlog files. So I set the default reject policy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1097157413


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);

Review Comment:
   you can use JDK  CallerRunsPolicy 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#issuecomment-1383209679

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#5887](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e135ef7) into [develop](https://codecov.io/gh/apache/rocketmq/commit/5b64ffc9e86126342d2a441849124098e6e909ef?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5b64ffc) will **decrease** coverage by `0.13%`.
   > The diff coverage is `10.46%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #5887      +/-   ##
   =============================================
   - Coverage      42.43%   42.30%   -0.14%     
   + Complexity      8277     8272       -5     
   =============================================
     Files           1066     1066              
     Lines          73671    73840     +169     
     Branches        9630     9661      +31     
   =============================================
   - Hits           31261    31236      -25     
   - Misses         38460    38646     +186     
   - Partials        3950     3958       +8     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...org/apache/rocketmq/store/DefaultMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0RlZmF1bHRNZXNzYWdlU3RvcmUuamF2YQ==) | `48.02% <8.48%> (-4.90%)` | :arrow_down: |
   | [...ache/rocketmq/store/config/MessageStoreConfig.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2NvbmZpZy9NZXNzYWdlU3RvcmVDb25maWcuamF2YQ==) | `58.62% <50.00%> (-0.06%)` | :arrow_down: |
   | [.../main/java/org/apache/rocketmq/common/UtilAll.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vVXRpbEFsbC5qYXZh) | `36.09% <66.66%> (-0.44%)` | :arrow_down: |
   | [...tmq/remoting/protocol/body/ConsumerConnection.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL3Byb3RvY29sL2JvZHkvQ29uc3VtZXJDb25uZWN0aW9uLmphdmE=) | `95.83% <0.00%> (-4.17%)` | :arrow_down: |
   | [...a/org/apache/rocketmq/broker/BrokerController.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvQnJva2VyQ29udHJvbGxlci5qYXZh) | `43.29% <0.00%> (-2.86%)` | :arrow_down: |
   | [...pache/rocketmq/remoting/common/RemotingHelper.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL2NvbW1vbi9SZW1vdGluZ0hlbHBlci5qYXZh) | `15.38% <0.00%> (-2.31%)` | :arrow_down: |
   | [...mq/client/impl/consumer/RebalanceLitePullImpl.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VMaXRlUHVsbEltcGwuamF2YQ==) | `69.86% <0.00%> (-1.37%)` | :arrow_down: |
   | [...mq/store/ha/autoswitch/AutoSwitchHAConnection.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvQXV0b1N3aXRjaEhBQ29ubmVjdGlvbi5qYXZh) | `70.00% <0.00%> (-1.36%)` | :arrow_down: |
   | [...apache/rocketmq/store/ha/GroupTransferService.java](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0dyb3VwVHJhbnNmZXJTZXJ2aWNlLmphdmE=) | `92.30% <0.00%> (-1.29%)` | :arrow_down: |
   | ... and [12 more](https://codecov.io/gh/apache/rocketmq/pull/5887?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1072411674


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+

Review Comment:
   > using ServiceThread notify and wait logic
   
   Does it mean countdownlatch2? Specifically, MainBatchDispatchRequestServiceThread notifications and waits between ReputMessageService?



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);

Review Comment:
   > duplicate code,please refactor into a method
   
   Did you say 2933 and 2934?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096500515


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   when runnable tasks greater than  1024, batchDispatchRequestExecutor miss RejectedExecutionHandler  logic,the default RejectedExecutionHandler is right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096871291


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   batchDispatchRequestExecutor  execute  runnable  must be have try catch final protect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1097193840


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);

Review Comment:
   > you can use JDK CallerRunsPolicy
   
   I don't think using CallerRunsPolicy works well. Because when run() is executed, the calling thread blocks waiting for a long time. Even after the workQueue is idle, the calling thread continues to block for some time. In the current way, the calling thread simply blocks and waits until the workQueue is idle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1103920265


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();

Review Comment:
   I agree about the part that the process of decoding byteBuffer should be combined into a method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104126388


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2595,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private long id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, long id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        long ptr = 0;
+
+        AtomicLong maxPtr = new AtomicLong();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(long idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = (int) (idx % this.buffer.length);
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = (int) (ptr % this.buffer.length);
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }

Review Comment:
   I agree with you. The intention here is to reduce the new List<DispatchRequest[]>, so the return value is not set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096617629


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   > i can't confirm that batchDispatchRequestQueue queue getSize method is concurrent safe
   > 
   > common usage set right reject policy of Executor.
   
   You are right, it is more appropriate to set a right reject policy of Executor. 
   
   ( The getsize() method is concurrency-safe because it is LinkedBlockingQueue. )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1097207514


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   Give you the demo, you run it ,and  think about different
   
   ```
   package org.apache.rocketmq.test;
   
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.LinkedBlockingQueue;
   import java.util.concurrent.ThreadPoolExecutor;
   import java.util.concurrent.TimeUnit;
   
   import org.apache.rocketmq.common.ThreadFactoryImpl;
   
   public class Test {
   
       
       public static void main(String[] args) {
           good(false);
           good(true);
           bad(false);
           bad(true);
   
       }
   
       public static void good(boolean throwException) {
           ExecutorService executor = new ThreadPoolExecutor(1, 1,
                   1000 * 60,
                   TimeUnit.MICROSECONDS,
                   new LinkedBlockingQueue<>(2),
                   new ThreadFactoryImpl("GoodBatchDispatchRequestServiceThread_"),
                   new ThreadPoolExecutor.CallerRunsPolicy());
           for (int i = 0; i < 15; i++) {
               executor.execute(() -> {
                   try {
                       TimeUnit.SECONDS.sleep(1);
                       System.out.println("ThreadName:" + Thread.currentThread().getName());
                       if (throwException)
                           throw new RuntimeException("test");
                   } catch (Exception e) {
   
                   }
               });
           }
   
           for (int i = 0; i < 5; i++) {
               executor.execute(() -> {
                   System.out.println("testbb");
               });
           }
           System.out.println("test-cccccccc");
   
       }
   
       public static void bad(boolean throwException) {
           ExecutorService executor = new ThreadPoolExecutor(1, 1,
                   1000 * 60,
                   TimeUnit.MICROSECONDS,
                   new LinkedBlockingQueue<>(2),
                   new ThreadFactoryImpl("BadBatchDispatchRequestServiceThread_"));
           for (int i = 0; i < 15; i++) {
               executor.execute(() -> {
                   try {
                       TimeUnit.SECONDS.sleep(1);
                   } catch (InterruptedException e) {
   
                   }
                   System.out.println("ThreadName:" + Thread.currentThread().getName());
                   if (throwException)
                       throw new RuntimeException("test");
               });
           }
   
           for (int i = 0; i < 5; i++) {
               executor.execute(() -> {
                   System.out.println("testbb");
               });
           }
           System.out.println("test-cccccccc");
   
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1101434693


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();

Review Comment:
   batchDispatchRequestQueue.polll



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104269711


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   Sorry, on second thought, I don't provide a way to modify BATCH_SIZE yet, because BATCH_SIZE may affect the number of tasks in a thread pool, which could result in a large number of reject exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104389996


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma merged pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma merged PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1071170469


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;
+                } finally {
+                    this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize);
+                    boolean over = this.mappedPageHoldCount.get() == 0;
+                    while (!over) {
+                        try {
+                            Thread.sleep(1);

Review Comment:
   TimeUnit.MILLISECONDS.sleep(1) replace Thread.sleep



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);

Review Comment:
   duplicate code,please refactor into a method 



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;

Review Comment:
   missing comment



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),

Review Comment:
   oom risk



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+

Review Comment:
   missing stop or shutdown logic 



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void startBatchDispatchRequestService() {
+            new Thread(() -> {
+                while (true) {
+                    if (!batchDispatchRequestQueue.isEmpty()) {
+                        BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                        batchDispatchRequestExecutor.execute(() -> {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        });
+                    } else {
+                        try {
+                            Thread.sleep(1);

Review Comment:
   TimeUnit.MILLISECONDS.sleep



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;

Review Comment:
   using byteBuffer.mark(),byteBuffer.reset() may be more code readability



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+

Review Comment:
   using ServiceThread notify and wait logic 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1071814465


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;

Review Comment:
   > This line seems redundant.
   
   Yes, it has been deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1072468992


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;

Review Comment:
   > using byteBuffer.mark(),byteBuffer.reset() may be more code readability
   
   mark() and reset() have been used.(byteBuffer.mark(); byteBuffer.getInt(); byteBuffer.reset();) May I ask which part of the code comments are needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1098618349


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   you can refactor related code as soon as possible. @nowinkeyy 
    
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "guyinyou (via GitHub)" <gi...@apache.org>.
guyinyou commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1100880726


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2595,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private long id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, long id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        long ptr = 0;
+
+        AtomicLong maxPtr = new AtomicLong();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(long idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = (int) (idx % this.buffer.length);
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = (int) (ptr % this.buffer.length);
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
-        private volatile long reputFromOffset = 0;
+        public volatile long reputFromOffset = 0;
 
         public long getReputFromOffset() {

Review Comment:
   "protected" can be used for extends, no need "public"



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2851,264 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                        tmpByteBuffer.position(task.position);
+                        tmpByteBuffer.limit(task.position + task.size);
+                        List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                        while (tmpByteBuffer.hasRemaining()) {
+                            DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                            if (dispatchRequest.isSuccess()) {
+                                dispatchRequestList.add(dispatchRequest);
+                            } else {
+                                LOGGER.error("[BUG]read total count not equals msg total size.");
+                            }
+                        }
+                        dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                        mappedPageHoldCount.getAndDecrement();
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                LOGGER.warn(e.getMessage());

Review Comment:
   Maybe the following example is right
   
   `
   batchDispatchRequestExecutor.execute(() -> {
   try{xxxxxx}cache{}
   })
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096504743


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2610,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+

Review Comment:
   maxPtr maybe AtomicLong type is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1097207514


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   Give you the demo, you can think about different
   
   ```
   package org.apache.rocketmq.test;
   
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.LinkedBlockingQueue;
   import java.util.concurrent.ThreadPoolExecutor;
   import java.util.concurrent.TimeUnit;
   
   import org.apache.rocketmq.common.ThreadFactoryImpl;
   
   public class Test {
   
       
       public static void main(String[] args) {
           good(false);
           good(true);
           bad(false);
           bad(true);
   
       }
   
       public static void good(boolean throwException) {
           ExecutorService executor = new ThreadPoolExecutor(1, 1,
                   1000 * 60,
                   TimeUnit.MICROSECONDS,
                   new LinkedBlockingQueue<>(2),
                   new ThreadFactoryImpl("GoodBatchDispatchRequestServiceThread_"),
                   new ThreadPoolExecutor.CallerRunsPolicy());
           for (int i = 0; i < 15; i++) {
               executor.execute(() -> {
                   try {
                       TimeUnit.SECONDS.sleep(1);
                       System.out.println("ThreadName:" + Thread.currentThread().getName());
                       if (throwException)
                           throw new RuntimeException("test");
                   } catch (Exception e) {
   
                   }
               });
           }
   
           for (int i = 0; i < 5; i++) {
               executor.execute(() -> {
                   System.out.println("testbb");
               });
           }
           System.out.println("test-cccccccc");
   
       }
   
       public static void bad(boolean throwException) {
           ExecutorService executor = new ThreadPoolExecutor(1, 1,
                   1000 * 60,
                   TimeUnit.MICROSECONDS,
                   new LinkedBlockingQueue<>(2),
                   new ThreadFactoryImpl("BadBatchDispatchRequestServiceThread_"));
           for (int i = 0; i < 15; i++) {
               executor.execute(() -> {
                   try {
                       TimeUnit.SECONDS.sleep(1);
                   } catch (InterruptedException e) {
   
                   }
                   System.out.println("ThreadName:" + Thread.currentThread().getName());
                   if (throwException)
                       throw new RuntimeException("test");
               });
           }
   
           for (int i = 0; i < 5; i++) {
               executor.execute(() -> {
                   System.out.println("testbb");
               });
           }
           System.out.println("test-cccccccc");
   
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1102693285


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();

Review Comment:
   I think a synthesis method is a bit difficult, because this part of the code to follow below stronger affinity, less like checkMessageAndReturnSize method can very good independence. The following parts can then be combined into a method:
   
   ![image](https://user-images.githubusercontent.com/72536832/218089678-8850a475-5a4d-4c69-b758-48fa3317f9bb.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104204841


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   Maybe BATCH_SIZE should also be put into MessageStoreConfig.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1070791385


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void startBatchDispatchRequestService() {
+            new Thread(() -> {
+                while (true) {
+                    if (!batchDispatchRequestQueue.isEmpty()) {
+                        BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                        batchDispatchRequestExecutor.execute(() -> {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        });
+                    } else {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }, "MainBatchDispatchRequestServiceThread").start();
+
+            new Thread(() -> {

Review Comment:
   It would be better to use org.apache.rocketmq.common.ServiceThread instead of java native thread.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {

Review Comment:
   How about creating a new class that extends ReputMessageService?  We can initialize different implementations according to isEnableBuildConsumeQueueConcurrently config. On the other hand, the ReputMessageService code will not change and the code readability will be better.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;

Review Comment:
   This line seems redundant.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void startBatchDispatchRequestService() {
+            new Thread(() -> {

Review Comment:
   It would be better to use org.apache.rocketmq.common.ServiceThread instead of java native thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#issuecomment-1383435296

   Hi @nowinkeyy, It would be better to provide the performance test report.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
nowinkeyy commented on PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#issuecomment-1397387101

   Test data was provided by @guyinyou
   
   ![image](https://user-images.githubusercontent.com/72536832/213522402-991ec9a3-2336-441e-bcda-af322ef1bd70.png)
   ![image](https://user-images.githubusercontent.com/72536832/213522442-01521c3a-c657-49ae-9969-620017e55307.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096505607


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   > > when runnable tasks greater than 1024, batchDispatchRequestExecutor miss RejectedExecutionHandler logic,the default RejectedExecutionHandler is right?
   > 
   > The reason I set the value to 1024 is that I think this size is sufficient. Because a task size is 4MB. A commitlog file size of 1G can generate 256tasks. So the queue can hold 1024 tasks generated by four commitlog files. So I set the default reject policy.
   
   I got it .
   
   adjust 4096 ? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1100856585


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,287 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        byteBuffer.mark();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        byteBuffer.reset();
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } finally {
+                    this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize);
+                    boolean over = mappedPageHoldCount.get() == 0;
+                    while (!over) {
+                        try {
+                            TimeUnit.MILLISECONDS.sleep(1);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                        over = mappedPageHoldCount.get() == 0;
+                    }
+                    result.release();
+                }
+            }
+        }
+
+        @Override
+        public void shutdown() {
+            for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(100);
+                } catch (InterruptedException ignored) {
+                }
+            }
+
+            if (this.isCommitLogAvailable()) {
+                LOGGER.warn("shutdown concurrentReputMessageService, but CommitLog have not finish to be dispatched, CommitLog max" +
+                                " offset={}, reputFromOffset={}", DefaultMessageStore.this.commitLog.getMaxOffset(),
+                        this.reputFromOffset);
+            }
+
+            this.mainBatchDispatchRequestService.shutdown();
+            this.dispatchService.shutdown();
+            super.shutdown();
+        }
+
+        @Override
+        public void run() {

Review Comment:
   Remove this method to simply inherit it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1101434964


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();

Review Comment:
   delete batchDispatchRequestQueue.poll()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1097207514


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   Give you the demo, you run it ,and  think about different
   
   ```
   package org.apache.rocketmq.test;
   
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.LinkedBlockingQueue;
   import java.util.concurrent.ThreadPoolExecutor;
   import java.util.concurrent.TimeUnit;
   
   import org.apache.rocketmq.common.ThreadFactoryImpl;
   
   public class Test {
   
       
       public static void main(String[] args) {
           good(false);
           good(true);
           bad(false);
           bad(true);
   
       }
   
       public static void good(boolean throwException) {
           ExecutorService executor = new ThreadPoolExecutor(1, 2,
                   1000 * 60,
                   TimeUnit.MICROSECONDS,
                   new LinkedBlockingQueue<>(2),
                   new ThreadFactoryImpl("GoodBatchDispatchRequestServiceThread_"),
                   new ThreadPoolExecutor.CallerRunsPolicy());
           for (int i = 0; i < 15; i++) {
               executor.execute(() -> {
                   try {
                       TimeUnit.SECONDS.sleep(1);
                       System.out.println("ThreadName:" + Thread.currentThread().getName());
                       if (throwException)
                           throw new RuntimeException("test");
                   } catch (Exception e) {
   
                   }
               });
           }
   
           for (int i = 0; i < 5; i++) {
               executor.execute(() -> {
                   System.out.println("testbb");
               });
           }
           System.out.println("test-cccccccc");
   
       }
   
       public static void bad(boolean throwException) {
           ExecutorService executor = new ThreadPoolExecutor(1, 2,
                   1000 * 60,
                   TimeUnit.MICROSECONDS,
                   new LinkedBlockingQueue<>(2),
                   new ThreadFactoryImpl("BadBatchDispatchRequestServiceThread_"));
           for (int i = 0; i < 15; i++) {
               executor.execute(() -> {
                   try {
                       TimeUnit.SECONDS.sleep(1);
                   } catch (InterruptedException e) {
   
                   }
                   System.out.println("ThreadName:" + Thread.currentThread().getName());
                   if (throwException)
                       throw new RuntimeException("test");
               });
           }
   
           for (int i = 0; i < 5; i++) {
               executor.execute(() -> {
                   System.out.println("testbb");
               });
           }
           System.out.println("test-cccccccc");
   
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1098215771


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   The differences I think are: 
   1) good() has a deny reject CallerRunsPolicy, and the calling thread main is responsible for performing the overflowed task. Bad() does not have a reject policy, and overflowed tasks are discarded. 
   2) Exceptions thrown within the scope of try will be catch. Exceptions thrown outside the scope of the try are thrown to a higher level and interrupt the current program, and seem to cause Executor to create a new thread (the previous thread may have been destroyed)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1103922836


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,282 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();

Review Comment:
   > Do you want to submit another commit to combine it into a method?
   
   yeah, i will.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104056423


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2595,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private long id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, long id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        long ptr = 0;
+
+        AtomicLong maxPtr = new AtomicLong();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(long idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = (int) (idx % this.buffer.length);
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = (int) (ptr % this.buffer.length);
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }

Review Comment:
   > And DispatchRequestOrderlyQueue#get always returns null, which looks weird. Changed to the following form to be more object-oriented.
   
   How about I change it to void? Because it doesn't need a return value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104072908


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2595,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private long id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, long id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        long ptr = 0;
+
+        AtomicLong maxPtr = new AtomicLong();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(long idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = (int) (idx % this.buffer.length);
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = (int) (ptr % this.buffer.length);
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }

Review Comment:
   > > And DispatchRequestOrderlyQueue#get always returns null, which looks weird. Changed to the following form to be more object-oriented.
   > 
   > How about I change it to void? Because it doesn't need a return value.
   
   `public void get(List<DispatchRequest[]> rets)` seems like a function in C code style to avoid returning local variables causing dangling pointer problems. In Java, we are more accustomed to using the return value of the `get` method.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
fuyou001 commented on PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#issuecomment-1383971033

   please add unit test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by GitBox <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1071877718


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;
+                } finally {
+                    this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize);
+                    boolean over = this.mappedPageHoldCount.get() == 0;
+                    while (!over) {
+                        try {
+                            Thread.sleep(1);

Review Comment:
   > TimeUnit.MILLISECONDS.sleep(1) replace Thread.sleep
   
   okk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1099528439


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2851,264 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                        tmpByteBuffer.position(task.position);
+                        tmpByteBuffer.limit(task.position + task.size);
+                        List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                        while (tmpByteBuffer.hasRemaining()) {
+                            DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                            if (dispatchRequest.isSuccess()) {
+                                dispatchRequestList.add(dispatchRequest);
+                            } else {
+                                LOGGER.error("[BUG]read total count not equals msg total size.");
+                            }
+                        }
+                        dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                        mappedPageHoldCount.getAndDecrement();
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                LOGGER.warn(e.getMessage());

Review Comment:
    like DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1099674333


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2851,264 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();

Review Comment:
   try..catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096504743


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2610,77 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+

Review Comment:
   maxPtr maybe AtomicLoing type is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096863845


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2852,269 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
+        }
+
+        private void pollBatchDispatchRequest() {
+            if (!batchDispatchRequestQueue.isEmpty()) {
+                BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                batchDispatchRequestExecutor.execute(() -> {
+                    ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                    tmpByteBuffer.position(task.position);
+                    tmpByteBuffer.limit(task.position + task.size);
+                    List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                    while (tmpByteBuffer.hasRemaining()) {
+                        DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                        if (dispatchRequest.isSuccess()) {
+                            dispatchRequestList.add(dispatchRequest);
+                        } else {
+                            LOGGER.error("[BUG]read total count not equals msg total size.");
+                        }
+                    }
+                    dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                    mappedPageHoldCount.getAndDecrement();
+                });
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private int batchId = 0;

Review Comment:
   batchId type change long



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "fuyou001 (via GitHub)" <gi...@apache.org>.
fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096519227


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   @nowinkeyy , sorry.
   
   I thought for a moment,it must be se right RejectedExecutionHandler,because if it occur,may can lost message. the 
   reject policy can block task,but cant't throw Exception cause message lost
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1096546335


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2850,258 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(1024),

Review Comment:
   > @nowinkeyy , sorry.
   > 
   > I thought for a moment,it must be se right RejectedExecutionHandler,because if it occur,may can lost message. the reject policy can block task,but cant't throw Exception cause message lost
   
   You are right. And i have an idea that I don't know if it is feasible. Before putting a task on a thread pool queue, determine if the queue size is less than 1024? As shown in the figure:
   ![image](https://user-images.githubusercontent.com/72536832/216774028-71396e47-3c39-452e-b7ea-913145f2f2c3.png)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104023612


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {

Review Comment:
   Can you give these variables a more clear name or add some comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104209698


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   I agree with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "drpmma (via GitHub)" <gi...@apache.org>.
drpmma commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104340762


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   BATCH_SIZE can be assigned from MessageStoreConfig here which will only take effect when broker startup.
   
   https://github.com/apache/rocketmq/blob/02577ed7230a3587cb04ba65abdd6870230c4d7e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java#L2970
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

Posted by "nowinkeyy (via GitHub)" <gi...@apache.org>.
nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1104366365


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2835,291 @@ public String getServiceName() {
 
     }
 
+    class MainBatchDispatchRequestService extends ServiceThread {
+
+        private final ExecutorService batchDispatchRequestExecutor;
+
+        public MainBatchDispatchRequestService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new ThreadPoolExecutor.AbortPolicy());
+        }
+
+        private void pollBatchDispatchRequest() {
+            try {
+                if (!batchDispatchRequestQueue.isEmpty()) {
+                    BatchDispatchRequest task = batchDispatchRequestQueue.peek();
+                    batchDispatchRequestExecutor.execute(() -> {
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
+                        }
+                    });
+                    batchDispatchRequestQueue.poll();
+                }
+            } catch (Exception e) {
+                DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    pollBatchDispatchRequest();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName();
+            }
+            return MainBatchDispatchRequestService.class.getSimpleName();
+        }
+
+    }
+
+    class DispatchService extends ServiceThread {
+
+        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>();
+
+        private void dispatch() {
+            dispatchRequestsList.clear();
+            dispatchRequestOrderlyQueue.get(dispatchRequestsList);
+            if (!dispatchRequestsList.isEmpty()) {
+                for (DispatchRequest[] dispatchRequests : dispatchRequestsList) {
+                    for (DispatchRequest dispatchRequest : dispatchRequests) {
+                        DefaultMessageStore.this.doDispatch(dispatchRequest);
+                        // wake up long-polling
+                        if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
+                                && DefaultMessageStore.this.messageArrivingListener != null) {
+                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+                            DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
+                        }
+                        if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
+                                DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                            DefaultMessageStore.this.storeStatsService
+                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                    .add(dispatchRequest.getMsgSize());
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1);
+                    dispatch();
+                } catch (Exception e) {
+                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) {
+                return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName();
+            }
+            return DispatchService.class.getSimpleName();
+        }
+    }
+
+    class ConcurrentReputMessageService extends ReputMessageService {
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private long batchId = 0;
+
+        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService() {
+            super();
+            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
+        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
+        @Override
+        public void doReput() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = preCheckMessageAndReturnSize(byteBuffer);
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position();
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {

Review Comment:
   What you said is right. But we don't want to expose BATCH_SIZE right now. It doesn't make much sense to debug this parameter right now because it's not the bottleneck. If this parameter is set too small, problems may occur. (If you have any more reasons, please let me know and I will make changes.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org