You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/03 05:44:35 UTC
[rocketmq] branch develop updated: [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 462802158 [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968)
462802158 is described below
commit 462802158830f3fb2c4b76d12d0817936b52988d
Author: pingww <pi...@gmail.com>
AuthorDate: Fri Feb 3 13:44:28 2023 +0800
[ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968)
* [ISSUE #5965] Fix lmqTopicQueueTable initialization
* [ISSUE #5965] Fix lmqTopicQueueTable initialization
---
.../org/apache/rocketmq/store/queue/ConsumeQueueStore.java | 1 +
.../apache/rocketmq/store/queue/QueueOffsetAssigner.java | 13 +++++++++++++
2 files changed, 14 insertions(+)
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 486e1b756..8b77f4942 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -375,6 +375,7 @@ public class ConsumeQueueStore {
public void setTopicQueueTable(ConcurrentMap<String, Long> topicQueueTable) {
this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable);
+ this.queueOffsetAssigner.setLmqTopicQueueTable(topicQueueTable);
}
public ConcurrentMap getTopicQueueTable() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index d069aea67..fe8586f6d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -17,8 +17,11 @@
package org.apache.rocketmq.store.queue;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -82,6 +85,16 @@ public class QueueOffsetAssigner {
this.topicQueueTable = topicQueueTable;
}
+ public void setLmqTopicQueueTable(ConcurrentMap<String, Long> lmqTopicQueueTable) {
+ ConcurrentMap<String, Long> table = new ConcurrentHashMap<String, Long>(1024);
+ for (Map.Entry<String, Long> entry : lmqTopicQueueTable.entrySet()) {
+ if (MixAll.isLmq(entry.getKey())) {
+ table.put(entry.getKey(), entry.getValue());
+ }
+ }
+ this.lmqTopicQueueTable = table;
+ }
+
public ConcurrentMap<String, Long> getTopicQueueTable() {
return topicQueueTable;
}