You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/08/08 07:51:59 UTC

[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4797: [ISSUE #4771]In HA mode, if you do not truncate consumerOffsetMap, it may cause a bug

RongtongJin commented on code in PR #4797:
URL: https://github.com/apache/rocketmq/pull/4797#discussion_r939931107


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -168,6 +168,8 @@ public class DefaultMessageStore implements MessageStore {
     private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
         new ConcurrentHashMap<Integer, Long>(32);
 
+    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable;

Review Comment:
   There is no need to add a consumer offset map here. Perhaps we can modify the consummeQueueStore of the broker module through the register hook.



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java:
##########
@@ -81,7 +81,7 @@ public class TimerMessageStore {
     public static final int DAY_SECS = 24 * 3600;
     // The total days in the timer wheel when precision is 1000ms.
     // If the broker shutdown last more than the configured days, will cause message loss
-    public static final int TIMER_WHELL_TTL_DAY = 7;
+    public static final int TIMER_WHEEL_TTL_DAY = 7;

Review Comment:
   Modifications not related to this PR can be split



##########
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java:
##########
@@ -341,6 +343,50 @@ private void putConsumeQueue(final String topic, final int queueId, final Consum
         }
     }
 
+    public void truncateTimerConsumerOffset(TimerMessageStore timerMessageStore) {
+        if (this.consumeQueueTable.get(TimerMessageStore.TIMER_TOPIC) == null
+            || this.consumeQueueTable.get(TimerMessageStore.TIMER_TOPIC).get(0) == null) {
+            return;
+        }
+        ConsumeQueueInterface consumerQueue = this.consumeQueueTable.get(TimerMessageStore.TIMER_TOPIC).get(0);
+        if (timerMessageStore.getQueueOffset() > consumerQueue.getMaxOffsetInQueue()) {
+            timerMessageStore.setQueueOffset(consumerQueue.getMaxOffsetInQueue());
+        }
+    }

Review Comment:
   It would be better to handle timer consumer offset separately in consume queue store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org