You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2023/01/17 07:08:58 UTC

[GitHub] [rocketmq] nowinkeyy commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1071814465


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;

Review Comment:
   > This line seems redundant.
   
   Yes, it has been deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org