You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/27 07:25:41 UTC
[rocketmq] branch develop updated: [ISSUE #6430] Scan topic.json to find compactionTopic and copy it
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 654c12ed5 [ISSUE #6430] Scan topic.json to find compactionTopic and copy it
654c12ed5 is described below
commit 654c12ed56224cabca06768d4f75be29baf9ea1f
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Mon Mar 27 15:25:29 2023 +0800
[ISSUE #6430] Scan topic.json to find compactionTopic and copy it
Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
.../apache/rocketmq/store/DefaultMessageStore.java | 5 +
.../org/apache/rocketmq/store/MessageStore.java | 140 +++++++++++----------
.../apache/rocketmq/store/kv/CompactionStore.java | 48 +++++--
.../store/plugin/AbstractPluginMessageStore.java | 8 ++
.../rocketmq/store/queue/ConsumeQueueStore.java | 6 +-
5 files changed, 135 insertions(+), 72 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 403cb9ad1..dcdae008c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2049,6 +2049,11 @@ public class DefaultMessageStore implements MessageStore {
}
}
+ @Override
+ public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
+ return this.consumeQueueStore.getTopicConfigs();
+ }
+
@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return this.consumeQueueStore.getTopicConfig(topic);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 8e86f8abe..f77739fc4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -20,14 +20,17 @@ import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.View;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
@@ -116,11 +119,11 @@ public interface MessageStore {
* Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
@@ -131,11 +134,11 @@ public interface MessageStore {
* Asynchronous get message
* @see #getMessage(String, String, int, long, int, MessageFilter) getMessage
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
@@ -146,13 +149,13 @@ public interface MessageStore {
* Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param maxTotalMsgSize Maximum total msg size of the messages
- * @param messageFilter Message filter used to screen desired messages.
+ * @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
GetMessageResult getMessage(final String group, final String topic, final int queueId,
@@ -162,13 +165,13 @@ public interface MessageStore {
* Asynchronous get message
* @see #getMessage(String, String, int, long, int, int, MessageFilter) getMessage
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param maxTotalMsgSize Maximum total msg size of the messages
- * @param messageFilter Message filter used to screen desired messages.
+ * @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
@@ -177,7 +180,7 @@ public interface MessageStore {
/**
* Get maximum offset of the topic queue.
*
- * @param topic Topic name.
+ * @param topic Topic name.
* @param queueId Queue ID.
* @return Maximum offset at present.
*/
@@ -186,8 +189,8 @@ public interface MessageStore {
/**
* Get maximum offset of the topic queue.
*
- * @param topic Topic name.
- * @param queueId Queue ID.
+ * @param topic Topic name.
+ * @param queueId Queue ID.
* @param committed return the max offset in ConsumeQueue if true, or the max offset in CommitLog if false
* @return Maximum offset at present.
*/
@@ -196,7 +199,7 @@ public interface MessageStore {
/**
* Get the minimum offset of the topic queue.
*
- * @param topic Topic name.
+ * @param topic Topic name.
* @param queueId Queue ID.
* @return Minimum offset at present.
*/
@@ -209,8 +212,8 @@ public interface MessageStore {
/**
* Get the offset of the message in the commit log, which is also known as physical offset.
*
- * @param topic Topic of the message to lookup.
- * @param queueId Queue ID.
+ * @param topic Topic of the message to lookup.
+ * @param queueId Queue ID.
* @param consumeQueueOffset offset of consume queue.
* @return physical offset.
*/
@@ -219,8 +222,8 @@ public interface MessageStore {
/**
* Look up the physical offset of the message whose store timestamp is as specified.
*
- * @param topic Topic of the message.
- * @param queueId Queue ID.
+ * @param topic Topic of the message.
+ * @param queueId Queue ID.
* @param timestamp Timestamp to look up.
* @return physical offset which matches.
*/
@@ -238,7 +241,7 @@ public interface MessageStore {
* Look up the message by given commit log offset and size.
*
* @param commitLogOffset physical offset.
- * @param size message size
+ * @param size message size
* @return Message whose physical offset is as specified.
*/
MessageExt lookMessageByOffset(long commitLogOffset, int size);
@@ -255,7 +258,7 @@ public interface MessageStore {
* Get one message from the specified commit log offset.
*
* @param commitLogOffset commit log offset.
- * @param msgSize message size.
+ * @param msgSize message size.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
@@ -266,7 +269,9 @@ public interface MessageStore {
* @return message store running info.
*/
String getRunningDataInfo();
+
long getTimingMessageCount(String topic);
+
/**
* Message store runtime information, which should generally contains various statistical information.
*
@@ -297,7 +302,7 @@ public interface MessageStore {
/**
* Get the store time of the earliest message in the given queue.
*
- * @param topic Topic of the messages to query.
+ * @param topic Topic of the messages to query.
* @param queueId Queue ID to find.
* @return store time of the earliest message.
*/
@@ -321,8 +326,8 @@ public interface MessageStore {
/**
* Get the store time of the message specified.
*
- * @param topic message topic.
- * @param queueId queue ID.
+ * @param topic message topic.
+ * @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
@@ -332,8 +337,8 @@ public interface MessageStore {
* Asynchronous get the store time of the message specified.
* @see #getMessageStoreTimeStamp(String, int, long) getMessageStoreTimeStamp
*
- * @param topic message topic.
- * @param queueId queue ID.
+ * @param topic message topic.
+ * @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
@@ -343,7 +348,7 @@ public interface MessageStore {
/**
* Get the total number of the messages in the specified queue.
*
- * @param topic Topic
+ * @param topic Topic
* @param queueId Queue ID.
* @return total number.
*/
@@ -361,7 +366,7 @@ public interface MessageStore {
* Get the raw commit log data starting from the given offset, across multiple mapped files.
*
* @param offset starting offset.
- * @param size size of data to get
+ * @param size size of data to get
* @return commit log data.
*/
List<SelectMappedBufferResult> getBulkCommitLogData(final long offset, final int size);
@@ -370,9 +375,9 @@ public interface MessageStore {
* Append data to commit log.
*
* @param startOffset starting offset.
- * @param data data to append.
- * @param dataStart the start index of data array
- * @param dataLength the length of data array
+ * @param data data to append.
+ * @param dataStart the start index of data array
+ * @param dataLength the length of data array
* @return true if success; false otherwise.
*/
boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength);
@@ -385,11 +390,11 @@ public interface MessageStore {
/**
* Query messages by given key.
*
- * @param topic topic of the message.
- * @param key message key.
+ * @param topic topic of the message.
+ * @param key message key.
* @param maxNum maximum number of the messages possible.
- * @param begin begin timestamp.
- * @param end end timestamp.
+ * @param begin begin timestamp.
+ * @param end end timestamp.
*/
QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end);
@@ -398,11 +403,11 @@ public interface MessageStore {
* Asynchronous query messages by given key.
* @see #queryMessage(String, String, int, long, long) queryMessage
*
- * @param topic topic of the message.
- * @param key message key.
+ * @param topic topic of the message.
+ * @param key message key.
* @param maxNum maximum number of the messages possible.
- * @param begin begin timestamp.
- * @param end end timestamp.
+ * @param begin begin timestamp.
+ * @param end end timestamp.
*/
CompletableFuture<QueryMessageResult> queryMessageAsync(final String topic, final String key, final int maxNum,
final long begin, final long end);
@@ -460,11 +465,11 @@ public interface MessageStore {
/**
* Check if the given message has been swapped out of the memory.
*
- * @param topic topic.
- * @param queueId queue ID.
+ * @param topic topic.
+ * @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
- * @deprecated As of RIP-57, replaced by {@link #checkInMemByConsumeOffset(String, int, long, int)}, see <a href="https://github.com/apache/rocketmq/issues/5837">this issue</a> for more details
+ * @deprecated As of RIP-57, replaced by {@link #checkInMemByConsumeOffset(String, int, long, int)}, see <a href="https://github.com/apache/rocketmq/issues/5837">this issue</a> for more details
*/
@Deprecated
boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
@@ -570,7 +575,7 @@ public interface MessageStore {
/**
* Get consume queue of the topic/queue. If consume queue not exist, will return null
*
- * @param topic Topic.
+ * @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
@@ -578,7 +583,7 @@ public interface MessageStore {
/**
* Get consume queue of the topic/queue. If consume queue not exist, will create one then return it.
- * @param topic Topic.
+ * @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
@@ -594,8 +599,8 @@ public interface MessageStore {
/**
* Will be triggered when a new message is appended to commit log.
*
- * @param msg the msg that is appended to commit log
- * @param result append message result
+ * @param msg the msg that is appended to commit log
+ * @param result append message result
* @param commitLogFile commit log file
*/
void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile);
@@ -604,10 +609,10 @@ public interface MessageStore {
* Will be triggered when a new dispatch request is sent to message store.
*
* @param dispatchRequest dispatch request
- * @param doDispatch do dispatch if true
- * @param commitLogFile commit log file
- * @param isRecover is from recover process
- * @param isFileEnd if the dispatch request represents 'file end'
+ * @param doDispatch do dispatch if true
+ * @param commitLogFile commit log file
+ * @param isRecover is from recover process
+ * @param isFileEnd if the dispatch request represents 'file end'
*/
void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, MappedFile commitLogFile,
boolean isRecover, boolean isFileEnd);
@@ -726,11 +731,18 @@ public interface MessageStore {
* Assign an queue offset and increase it. If there is a race condition, you need to lock/unlock this method
* yourself.
*
- * @param msg message
+ * @param msg message
* @param messageNum message num
*/
void assignOffset(MessageExtBrokerInner msg, short messageNum);
+ /**
+ * get all topic config
+ *
+ * @return all topic config info
+ */
+ Map<String, TopicConfig> getTopicConfigs();
+
/**
* get topic config
*
@@ -814,7 +826,7 @@ public interface MessageStore {
* Calculate the checksum of a certain range of data.
*
* @param from begin offset
- * @param to end offset
+ * @param to end offset
* @return checksum
*/
byte[] calcDeltaChecksum(long from, long to);
@@ -956,7 +968,7 @@ public interface MessageStore {
/**
* Init store metrics
*
- * @param meter opentelemetry meter
+ * @param meter opentelemetry meter
* @param attributesBuilderSupplier metrics attributes builder
*/
void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier);
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b4487753f..1142c8153 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,8 +16,16 @@
*/
package org.apache.rocketmq.store.kv;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CleanupPolicy;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.GetMessageResult;
@@ -29,7 +37,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -48,6 +55,7 @@ public class CompactionStore {
private final CompactionPositionMgr positionMgr;
private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
private final ScheduledExecutorService compactionSchedule;
+ private final int scanInterval = 30000;
private final int compactionInterval;
private final int compactionThreadNum;
private final int offsetMapSize;
@@ -96,10 +104,7 @@ public class CompactionStore {
int queueId = Integer.parseInt(fileQueueId.getName());
if (Files.isDirectory(Paths.get(compactionCqPath, topic, String.valueOf(queueId)))) {
- CompactionLog log = new CompactionLog(defaultMessageStore, this, topic, queueId);
- log.load(exitOk);
- compactionLogTable.put(topic + "_" + queueId, log);
- compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
+ loadAndGetClog(topic, queueId);
} else {
log.error("{}:{} compactionLog mismatch with compactionCq", topic, queueId);
}
@@ -114,13 +119,37 @@ public class CompactionStore {
}
}
log.info("compactionStore {}:{} load completed.", compactionLogPath, compactionCqPath);
+
+ compactionSchedule.scheduleWithFixedDelay(this::scanAllTopicConfig, scanInterval, scanInterval, TimeUnit.MILLISECONDS);
+ log.info("loop to scan all topicConfig with fixed delay {}ms", scanInterval);
}
- public void putMessage(String topic, int queueId, SelectMappedBufferResult smr) throws Exception {
+ private void scanAllTopicConfig() {
+ log.info("start to scan all topicConfig");
+ try {
+ Iterator<Map.Entry<String, TopicConfig>> iterator = defaultMessageStore.getTopicConfigs().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, TopicConfig> it = iterator.next();
+ TopicConfig topicConfig = it.getValue();
+ CleanupPolicy policy = CleanupPolicyUtils.getDeletePolicy(Optional.ofNullable(topicConfig));
+ //check topic flag
+ if (Objects.equals(policy, CleanupPolicy.COMPACTION)) {
+ for (int queueId = 0; queueId < topicConfig.getWriteQueueNums(); queueId++) {
+ loadAndGetClog(it.getKey(), queueId);
+ }
+ }
+ }
+ } catch (Throwable ignore) {
+ // ignore
+ }
+ log.info("scan all topicConfig over");
+ }
+
+ private CompactionLog loadAndGetClog(String topic, int queueId) {
CompactionLog clog = compactionLogTable.compute(topic + "_" + queueId, (k, v) -> {
if (v == null) {
try {
- v = new CompactionLog(defaultMessageStore,this, topic, queueId);
+ v = new CompactionLog(defaultMessageStore, this, topic, queueId);
v.load(true);
compactionSchedule.scheduleWithFixedDelay(v::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
} catch (IOException e) {
@@ -130,6 +159,11 @@ public class CompactionStore {
}
return v;
});
+ return clog;
+ }
+
+ public void putMessage(String topic, int queueId, SelectMappedBufferResult smr) throws Exception {
+ CompactionLog clog = loadAndGetClog(topic, queueId);
if (clog != null) {
clog.asyncPutMessage(smr);
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 77908c5fa..3f43adc12 100644
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -21,14 +21,17 @@ import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.View;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
@@ -592,6 +595,11 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
next.assignOffset(msg, messageNum);
}
+ @Override
+ public Map<String, TopicConfig> getTopicConfigs() {
+ return next.getTopicConfigs();
+ }
+
@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return next.getTopicConfig(topic);
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 8b77f4942..90f2e74aa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -95,7 +95,7 @@ public class ConsumeQueueStore {
* Apply the dispatched request and build the consume queue. This function should be idempotent.
*
* @param consumeQueue consume queue
- * @param request dispatch request
+ * @param request dispatch request
*/
public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) {
consumeQueue.putMessagePositionInfoWrapper(request);
@@ -537,6 +537,10 @@ public class ConsumeQueueStore {
}
}
+ public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
+ return this.topicConfigTable;
+ }
+
public Optional<TopicConfig> getTopicConfig(String topic) {
if (this.topicConfigTable == null) {
return Optional.empty();