You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/27 07:25:41 UTC

[rocketmq] branch develop updated: [ISSUE #6430] Scan topic.json to find compactionTopic and copy it

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 654c12ed5 [ISSUE #6430] Scan topic.json to find compactionTopic and copy it
654c12ed5 is described below

commit 654c12ed56224cabca06768d4f75be29baf9ea1f
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Mon Mar 27 15:25:29 2023 +0800

    [ISSUE #6430] Scan topic.json to find compactionTopic and copy it
    
    Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
 .../apache/rocketmq/store/DefaultMessageStore.java |   5 +
 .../org/apache/rocketmq/store/MessageStore.java    | 140 +++++++++++----------
 .../apache/rocketmq/store/kv/CompactionStore.java  |  48 +++++--
 .../store/plugin/AbstractPluginMessageStore.java   |   8 ++
 .../rocketmq/store/queue/ConsumeQueueStore.java    |   6 +-
 5 files changed, 135 insertions(+), 72 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 403cb9ad1..dcdae008c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2049,6 +2049,11 @@ public class DefaultMessageStore implements MessageStore {
         }
     }
 
+    @Override
+    public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
+        return this.consumeQueueStore.getTopicConfigs();
+    }
+
     @Override
     public Optional<TopicConfig> getTopicConfig(String topic) {
         return this.consumeQueueStore.getTopicConfig(topic);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 8e86f8abe..f77739fc4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -20,14 +20,17 @@ import io.opentelemetry.api.common.AttributesBuilder;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.sdk.metrics.InstrumentSelector;
 import io.opentelemetry.sdk.metrics.View;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
+
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.SystemClock;
 import org.apache.rocketmq.common.TopicConfig;
@@ -116,11 +119,11 @@ public interface MessageStore {
      * Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
      * from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
      *
-     * @param group Consumer group that launches this query.
-     * @param topic Topic to query.
-     * @param queueId Queue ID to query.
-     * @param offset Logical offset to start from.
-     * @param maxMsgNums Maximum count of messages to query.
+     * @param group         Consumer group that launches this query.
+     * @param topic         Topic to query.
+     * @param queueId       Queue ID to query.
+     * @param offset        Logical offset to start from.
+     * @param maxMsgNums    Maximum count of messages to query.
      * @param messageFilter Message filter used to screen desired messages.
      * @return Matched messages.
      */
@@ -131,11 +134,11 @@ public interface MessageStore {
      * Asynchronous get message
      * @see #getMessage(String, String, int, long, int, MessageFilter) getMessage
      *
-     * @param group Consumer group that launches this query.
-     * @param topic Topic to query.
-     * @param queueId Queue ID to query.
-     * @param offset Logical offset to start from.
-     * @param maxMsgNums Maximum count of messages to query.
+     * @param group         Consumer group that launches this query.
+     * @param topic         Topic to query.
+     * @param queueId       Queue ID to query.
+     * @param offset        Logical offset to start from.
+     * @param maxMsgNums    Maximum count of messages to query.
      * @param messageFilter Message filter used to screen desired messages.
      * @return Matched messages.
      */
@@ -146,13 +149,13 @@ public interface MessageStore {
      * Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
      * from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
      *
-     * @param group Consumer group that launches this query.
-     * @param topic Topic to query.
-     * @param queueId Queue ID to query.
-     * @param offset Logical offset to start from.
-     * @param maxMsgNums Maximum count of messages to query.
+     * @param group           Consumer group that launches this query.
+     * @param topic           Topic to query.
+     * @param queueId         Queue ID to query.
+     * @param offset          Logical offset to start from.
+     * @param maxMsgNums      Maximum count of messages to query.
      * @param maxTotalMsgSize Maximum total msg size of the messages
-     * @param messageFilter Message filter used to screen desired messages.
+     * @param messageFilter   Message filter used to screen desired messages.
      * @return Matched messages.
      */
     GetMessageResult getMessage(final String group, final String topic, final int queueId,
@@ -162,13 +165,13 @@ public interface MessageStore {
      * Asynchronous get message
      * @see #getMessage(String, String, int, long, int, int, MessageFilter) getMessage
      *
-     * @param group Consumer group that launches this query.
-     * @param topic Topic to query.
-     * @param queueId Queue ID to query.
-     * @param offset Logical offset to start from.
-     * @param maxMsgNums Maximum count of messages to query.
+     * @param group           Consumer group that launches this query.
+     * @param topic           Topic to query.
+     * @param queueId         Queue ID to query.
+     * @param offset          Logical offset to start from.
+     * @param maxMsgNums      Maximum count of messages to query.
      * @param maxTotalMsgSize Maximum total msg size of the messages
-     * @param messageFilter Message filter used to screen desired messages.
+     * @param messageFilter   Message filter used to screen desired messages.
      * @return Matched messages.
      */
     CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
@@ -177,7 +180,7 @@ public interface MessageStore {
     /**
      * Get maximum offset of the topic queue.
      *
-     * @param topic Topic name.
+     * @param topic   Topic name.
      * @param queueId Queue ID.
      * @return Maximum offset at present.
      */
@@ -186,8 +189,8 @@ public interface MessageStore {
     /**
      * Get maximum offset of the topic queue.
      *
-     * @param topic Topic name.
-     * @param queueId Queue ID.
+     * @param topic     Topic name.
+     * @param queueId   Queue ID.
      * @param committed return the max offset in ConsumeQueue if true, or the max offset in CommitLog if false
      * @return Maximum offset at present.
      */
@@ -196,7 +199,7 @@ public interface MessageStore {
     /**
      * Get the minimum offset of the topic queue.
      *
-     * @param topic Topic name.
+     * @param topic   Topic name.
      * @param queueId Queue ID.
      * @return Minimum offset at present.
      */
@@ -209,8 +212,8 @@ public interface MessageStore {
     /**
      * Get the offset of the message in the commit log, which is also known as physical offset.
      *
-     * @param topic Topic of the message to lookup.
-     * @param queueId Queue ID.
+     * @param topic              Topic of the message to lookup.
+     * @param queueId            Queue ID.
      * @param consumeQueueOffset offset of consume queue.
      * @return physical offset.
      */
@@ -219,8 +222,8 @@ public interface MessageStore {
     /**
      * Look up the physical offset of the message whose store timestamp is as specified.
      *
-     * @param topic Topic of the message.
-     * @param queueId Queue ID.
+     * @param topic     Topic of the message.
+     * @param queueId   Queue ID.
      * @param timestamp Timestamp to look up.
      * @return physical offset which matches.
      */
@@ -238,7 +241,7 @@ public interface MessageStore {
      * Look up the message by given commit log offset and size.
      *
      * @param commitLogOffset physical offset.
-     * @param size message size
+     * @param size            message size
      * @return Message whose physical offset is as specified.
      */
     MessageExt lookMessageByOffset(long commitLogOffset, int size);
@@ -255,7 +258,7 @@ public interface MessageStore {
      * Get one message from the specified commit log offset.
      *
      * @param commitLogOffset commit log offset.
-     * @param msgSize message size.
+     * @param msgSize         message size.
      * @return wrapped result of the message.
      */
     SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
@@ -266,7 +269,9 @@ public interface MessageStore {
      * @return message store running info.
      */
     String getRunningDataInfo();
+
     long getTimingMessageCount(String topic);
+
     /**
      * Message store runtime information, which should generally contains various statistical information.
      *
@@ -297,7 +302,7 @@ public interface MessageStore {
     /**
      * Get the store time of the earliest message in the given queue.
      *
-     * @param topic Topic of the messages to query.
+     * @param topic   Topic of the messages to query.
      * @param queueId Queue ID to find.
      * @return store time of the earliest message.
      */
@@ -321,8 +326,8 @@ public interface MessageStore {
     /**
      * Get the store time of the message specified.
      *
-     * @param topic message topic.
-     * @param queueId queue ID.
+     * @param topic              message topic.
+     * @param queueId            queue ID.
      * @param consumeQueueOffset consume queue offset.
      * @return store timestamp of the message.
      */
@@ -332,8 +337,8 @@ public interface MessageStore {
      * Asynchronous get the store time of the message specified.
      * @see #getMessageStoreTimeStamp(String, int, long) getMessageStoreTimeStamp
      *
-     * @param topic message topic.
-     * @param queueId queue ID.
+     * @param topic              message topic.
+     * @param queueId            queue ID.
      * @param consumeQueueOffset consume queue offset.
      * @return store timestamp of the message.
      */
@@ -343,7 +348,7 @@ public interface MessageStore {
     /**
      * Get the total number of the messages in the specified queue.
      *
-     * @param topic Topic
+     * @param topic   Topic
      * @param queueId Queue ID.
      * @return total number.
      */
@@ -361,7 +366,7 @@ public interface MessageStore {
      * Get the raw commit log data starting from the given offset, across multiple mapped files.
      *
      * @param offset starting offset.
-     * @param size size of data to get
+     * @param size   size of data to get
      * @return commit log data.
      */
     List<SelectMappedBufferResult> getBulkCommitLogData(final long offset, final int size);
@@ -370,9 +375,9 @@ public interface MessageStore {
      * Append data to commit log.
      *
      * @param startOffset starting offset.
-     * @param data data to append.
-     * @param dataStart the start index of data array
-     * @param dataLength the length of data array
+     * @param data        data to append.
+     * @param dataStart   the start index of data array
+     * @param dataLength  the length of data array
      * @return true if success; false otherwise.
      */
     boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength);
@@ -385,11 +390,11 @@ public interface MessageStore {
     /**
      * Query messages by given key.
      *
-     * @param topic topic of the message.
-     * @param key message key.
+     * @param topic  topic of the message.
+     * @param key    message key.
      * @param maxNum maximum number of the messages possible.
-     * @param begin begin timestamp.
-     * @param end end timestamp.
+     * @param begin  begin timestamp.
+     * @param end    end timestamp.
      */
     QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
         final long end);
@@ -398,11 +403,11 @@ public interface MessageStore {
      * Asynchronous query messages by given key.
      * @see #queryMessage(String, String, int, long, long) queryMessage
      *
-     * @param topic topic of the message.
-     * @param key message key.
+     * @param topic  topic of the message.
+     * @param key    message key.
      * @param maxNum maximum number of the messages possible.
-     * @param begin begin timestamp.
-     * @param end end timestamp.
+     * @param begin  begin timestamp.
+     * @param end    end timestamp.
      */
     CompletableFuture<QueryMessageResult> queryMessageAsync(final String topic, final String key, final int maxNum,
         final long begin, final long end);
@@ -460,11 +465,11 @@ public interface MessageStore {
     /**
      * Check if the given message has been swapped out of the memory.
      *
-     * @param topic topic.
-     * @param queueId queue ID.
+     * @param topic         topic.
+     * @param queueId       queue ID.
      * @param consumeOffset consume queue offset.
      * @return true if the message is no longer in memory; false otherwise.
-     * @deprecated  As of RIP-57, replaced by {@link #checkInMemByConsumeOffset(String, int, long, int)}, see <a href="https://github.com/apache/rocketmq/issues/5837">this issue</a> for more details
+     * @deprecated As of RIP-57, replaced by {@link #checkInMemByConsumeOffset(String, int, long, int)}, see <a href="https://github.com/apache/rocketmq/issues/5837">this issue</a> for more details
      */
     @Deprecated
     boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
@@ -570,7 +575,7 @@ public interface MessageStore {
     /**
      * Get consume queue of the topic/queue. If consume queue not exist, will return null
      *
-     * @param topic Topic.
+     * @param topic   Topic.
      * @param queueId Queue ID.
      * @return Consume queue.
      */
@@ -578,7 +583,7 @@ public interface MessageStore {
 
     /**
      * Get consume queue of the topic/queue. If consume queue not exist, will create one then return it.
-     * @param topic Topic.
+     * @param topic   Topic.
      * @param queueId Queue ID.
      * @return Consume queue.
      */
@@ -594,8 +599,8 @@ public interface MessageStore {
     /**
      * Will be triggered when a new message is appended to commit log.
      *
-     * @param msg the msg that is appended to commit log
-     * @param result append message result
+     * @param msg           the msg that is appended to commit log
+     * @param result        append message result
      * @param commitLogFile commit log file
      */
     void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile);
@@ -604,10 +609,10 @@ public interface MessageStore {
      * Will be triggered when a new dispatch request is sent to message store.
      *
      * @param dispatchRequest dispatch request
-     * @param doDispatch do dispatch if true
-     * @param commitLogFile commit log file
-     * @param isRecover is from recover process
-     * @param isFileEnd if the dispatch request represents 'file end'
+     * @param doDispatch      do dispatch if true
+     * @param commitLogFile   commit log file
+     * @param isRecover       is from recover process
+     * @param isFileEnd       if the dispatch request represents 'file end'
      */
     void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, MappedFile commitLogFile,
         boolean isRecover, boolean isFileEnd);
@@ -726,11 +731,18 @@ public interface MessageStore {
      * Assign an queue offset and increase it. If there is a race condition, you need to lock/unlock this method
      * yourself.
      *
-     * @param msg message
+     * @param msg        message
      * @param messageNum message num
      */
     void assignOffset(MessageExtBrokerInner msg, short messageNum);
 
+    /**
+     * get all topic config
+     *
+     * @return all topic config info
+     */
+    Map<String, TopicConfig> getTopicConfigs();
+
     /**
      * get topic config
      *
@@ -814,7 +826,7 @@ public interface MessageStore {
      * Calculate the checksum of a certain range of data.
      *
      * @param from begin offset
-     * @param to end offset
+     * @param to   end offset
      * @return checksum
      */
     byte[] calcDeltaChecksum(long from, long to);
@@ -956,7 +968,7 @@ public interface MessageStore {
     /**
      * Init store metrics
      *
-     * @param meter opentelemetry meter
+     * @param meter                     opentelemetry meter
      * @param attributesBuilderSupplier metrics attributes builder
      */
     void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier);
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b4487753f..1142c8153 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,8 +16,16 @@
  */
 package org.apache.rocketmq.store.kv;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CleanupPolicy;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.GetMessageResult;
@@ -29,7 +37,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -48,6 +55,7 @@ public class CompactionStore {
     private final CompactionPositionMgr positionMgr;
     private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
     private final ScheduledExecutorService compactionSchedule;
+    private final int scanInterval = 30000;
     private final int compactionInterval;
     private final int compactionThreadNum;
     private final int offsetMapSize;
@@ -96,10 +104,7 @@ public class CompactionStore {
                             int queueId = Integer.parseInt(fileQueueId.getName());
 
                             if (Files.isDirectory(Paths.get(compactionCqPath, topic, String.valueOf(queueId)))) {
-                                CompactionLog log = new CompactionLog(defaultMessageStore, this, topic, queueId);
-                                log.load(exitOk);
-                                compactionLogTable.put(topic + "_" + queueId, log);
-                                compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
+                                loadAndGetClog(topic, queueId);
                             } else {
                                 log.error("{}:{} compactionLog mismatch with compactionCq", topic, queueId);
                             }
@@ -114,13 +119,37 @@ public class CompactionStore {
             }
         }
         log.info("compactionStore {}:{} load completed.", compactionLogPath, compactionCqPath);
+
+        compactionSchedule.scheduleWithFixedDelay(this::scanAllTopicConfig, scanInterval, scanInterval, TimeUnit.MILLISECONDS);
+        log.info("loop to scan all topicConfig with fixed delay {}ms", scanInterval);
     }
 
-    public void putMessage(String topic, int queueId, SelectMappedBufferResult smr) throws Exception {
+    private void scanAllTopicConfig() {
+        log.info("start to scan all topicConfig");
+        try {
+            Iterator<Map.Entry<String, TopicConfig>> iterator = defaultMessageStore.getTopicConfigs().entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, TopicConfig> it = iterator.next();
+                TopicConfig topicConfig = it.getValue();
+                CleanupPolicy policy = CleanupPolicyUtils.getDeletePolicy(Optional.ofNullable(topicConfig));
+                //check topic flag
+                if (Objects.equals(policy, CleanupPolicy.COMPACTION)) {
+                    for (int queueId = 0; queueId < topicConfig.getWriteQueueNums(); queueId++) {
+                        loadAndGetClog(it.getKey(), queueId);
+                    }
+                }
+            }
+        } catch (Throwable ignore) {
+            // ignore
+        }
+        log.info("scan all topicConfig over");
+    }
+
+    private CompactionLog loadAndGetClog(String topic, int queueId) {
         CompactionLog clog = compactionLogTable.compute(topic + "_" + queueId, (k, v) -> {
             if (v == null) {
                 try {
-                    v = new CompactionLog(defaultMessageStore,this, topic, queueId);
+                    v = new CompactionLog(defaultMessageStore, this, topic, queueId);
                     v.load(true);
                     compactionSchedule.scheduleWithFixedDelay(v::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
                 } catch (IOException e) {
@@ -130,6 +159,11 @@ public class CompactionStore {
             }
             return v;
         });
+        return clog;
+    }
+
+    public void putMessage(String topic, int queueId, SelectMappedBufferResult smr) throws Exception {
+        CompactionLog clog = loadAndGetClog(topic, queueId);
 
         if (clog != null) {
             clog.asyncPutMessage(smr);
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 77908c5fa..3f43adc12 100644
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -21,14 +21,17 @@ import io.opentelemetry.api.common.AttributesBuilder;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.sdk.metrics.InstrumentSelector;
 import io.opentelemetry.sdk.metrics.View;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
+
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.SystemClock;
 import org.apache.rocketmq.common.TopicConfig;
@@ -592,6 +595,11 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
         next.assignOffset(msg, messageNum);
     }
 
+    @Override
+    public Map<String, TopicConfig> getTopicConfigs() {
+        return next.getTopicConfigs();
+    }
+
     @Override
     public Optional<TopicConfig> getTopicConfig(String topic) {
         return next.getTopicConfig(topic);
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 8b77f4942..90f2e74aa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -95,7 +95,7 @@ public class ConsumeQueueStore {
      * Apply the dispatched request and build the consume queue. This function should be idempotent.
      *
      * @param consumeQueue consume queue
-     * @param request dispatch request
+     * @param request      dispatch request
      */
     public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) {
         consumeQueue.putMessagePositionInfoWrapper(request);
@@ -537,6 +537,10 @@ public class ConsumeQueueStore {
         }
     }
 
+    public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
+        return this.topicConfigTable;
+    }
+
     public Optional<TopicConfig> getTopicConfig(String topic) {
         if (this.topicConfigTable == null) {
             return Optional.empty();