You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/06/10 01:06:32 UTC

[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4439: [ISSUE #4435] Code optimization for ConsumeQueue abstraction.

hzh0425 commented on code in PR #4439:
URL: https://github.com/apache/rocketmq/pull/4439#discussion_r894074875


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -321,53 +323,14 @@ public void start() throws Exception {
         if (this.getMessageStoreConfig().isDuplicationEnable()) {
             this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
         } else {
-            /**
-             * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
-             * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
-             * 3. Calculate the reput offset according to the consume queue;
-             * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
-             */
-            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
-            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) {
-                for (ConsumeQueueInterface logic : maps.values()) {
-                    if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
-                        maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
-                    }
-                }
-            }
-            if (maxPhysicalPosInLogicQueue < 0) {
-                maxPhysicalPosInLogicQueue = 0;
-            }
-            if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
-                maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
-                /**
-                 * This happens in following conditions:
-                 * 1. If someone removes all the consumequeue files or the disk get damaged.
-                 * 2. Launch a new broker, and copy the commitlog from other brokers.
-                 *
-                 * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
-                 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
-                 */
-                LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
-            }
-            LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
-                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
-            this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+            // It is [recover]'s responsibility to fully dispatch the commit log data before the max offset of commit log.
+            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());

Review Comment:
   Does this overlap with 'doRecheckReputOffsetFromCq' below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org