You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/03 05:43:49 UTC

[rocketmq] branch 4.9.x updated: [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5967)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new fa38abf23 [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5967)
fa38abf23 is described below

commit fa38abf235be2e8ace629cfd8def2d6b79e27102
Author: pingww <pi...@gmail.com>
AuthorDate: Fri Feb 3 13:43:39 2023 +0800

    [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5967)
---
 .../src/main/java/org/apache/rocketmq/store/CommitLog.java | 14 ++++++++++++++
 .../org/apache/rocketmq/store/DefaultMessageStore.java     |  1 +
 2 files changed, 15 insertions(+)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index efed87f67..6cb9f0973 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -1015,6 +1016,19 @@ public class CommitLog {
         return this.lmqTopicQueueTable;
     }
 
+    public void setLmqTopicQueueTable(Map<String, Long> lmqTopicQueueTable) {
+        if (!defaultMessageStore.getMessageStoreConfig().isEnableLmq()) {
+            return;
+        }
+        Map<String, Long> table = new HashMap<String, Long>(1024);
+        for (Map.Entry<String, Long> entry : lmqTopicQueueTable.entrySet()) {
+            if (MixAll.isLmq(entry.getKey())) {
+                table.put(entry.getKey(), entry.getValue());
+            }
+        }
+        this.lmqTopicQueueTable = table;
+    }
+
     abstract class FlushCommitLogService extends ServiceThread {
         protected static final int RETRY_TIMES_OVER = 10;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 8cbe690ea..5a091d37e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1481,6 +1481,7 @@ public class DefaultMessageStore implements MessageStore {
         }
 
         this.commitLog.setTopicQueueTable(table);
+        this.commitLog.setLmqTopicQueueTable(table);
     }
 
     public AllocateMappedFileService getAllocateMappedFileService() {