You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/03 05:44:35 UTC

[rocketmq] branch develop updated: [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 462802158 [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968)
462802158 is described below

commit 462802158830f3fb2c4b76d12d0817936b52988d
Author: pingww <pi...@gmail.com>
AuthorDate: Fri Feb 3 13:44:28 2023 +0800

    [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968)
    
    * [ISSUE #5965] Fix lmqTopicQueueTable initialization
    
    * [ISSUE #5965] Fix lmqTopicQueueTable initialization
---
 .../org/apache/rocketmq/store/queue/ConsumeQueueStore.java  |  1 +
 .../apache/rocketmq/store/queue/QueueOffsetAssigner.java    | 13 +++++++++++++
 2 files changed, 14 insertions(+)

diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 486e1b756..8b77f4942 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -375,6 +375,7 @@ public class ConsumeQueueStore {
 
     public void setTopicQueueTable(ConcurrentMap<String, Long> topicQueueTable) {
         this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable);
+        this.queueOffsetAssigner.setLmqTopicQueueTable(topicQueueTable);
     }
 
     public ConcurrentMap getTopicQueueTable() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index d069aea67..fe8586f6d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -17,8 +17,11 @@
 
 package org.apache.rocketmq.store.queue;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -82,6 +85,16 @@ public class QueueOffsetAssigner {
         this.topicQueueTable = topicQueueTable;
     }
 
+    public void setLmqTopicQueueTable(ConcurrentMap<String, Long> lmqTopicQueueTable) {
+        ConcurrentMap<String, Long> table = new ConcurrentHashMap<String, Long>(1024);
+        for (Map.Entry<String, Long> entry : lmqTopicQueueTable.entrySet()) {
+            if (MixAll.isLmq(entry.getKey())) {
+                table.put(entry.getKey(), entry.getValue());
+            }
+        }
+        this.lmqTopicQueueTable = table;
+    }
+
     public ConcurrentMap<String, Long> getTopicQueueTable() {
         return topicQueueTable;
     }