You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2023/01/12 11:03:01 UTC
[rocketmq] branch develop updated: [ISSUE #5863] implement tiered store metadata (#5864)
This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cbad53d60 [ISSUE #5863] implement tiered store metadata (#5864)
cbad53d60 is described below
commit cbad53d60d7a766d5ca290d45d28ed648e1a34b6
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Thu Jan 12 19:02:40 2023 +0800
[ISSUE #5863] implement tiered store metadata (#5864)
* implement tiered store metadata
* fix bazel
---
pom.xml | 1 +
tieredstore/BUILD.bazel | 55 +++++++
tieredstore/pom.xml | 46 ++++++
.../tiered/common/TieredMessageStoreConfig.java | 31 ++++
.../store/tiered/metadata/FileSegmentMetadata.java | 130 ++++++++++++++++
.../store/tiered/metadata/QueueMetadata.java | 70 +++++++++
.../metadata/TieredStoreMetadataManager.java | 167 +++++++++++++++++++++
.../TieredStoreMetadataSerializeWrapper.java | 54 +++++++
.../tiered/metadata/TieredStoreMetadataStore.java | 38 +++++
.../store/tiered/metadata/TopicMetadata.java | 77 ++++++++++
.../store/tiered/metadata/MetadataStoreTest.java | 142 ++++++++++++++++++
11 files changed, 811 insertions(+)
diff --git a/pom.xml b/pom.xml
index 040f8c5b6..9400633b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,7 @@
<module>container</module>
<module>controller</module>
<module>proxy</module>
+ <module>tieredstore</module>
</modules>
<build>
diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel
new file mode 100644
index 000000000..5a0176e89
--- /dev/null
+++ b/tieredstore/BUILD.bazel
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+load("//bazel:GenTestRules.bzl", "GenTestRules")
+
+java_library(
+ name = "tieredstore",
+ srcs = glob(["src/main/java/**/*.java"]),
+ visibility = ["//visibility:public"],
+ deps = [
+ "//common",
+ "//remoting",
+ "//store",
+ "@maven//:com_google_code_findbugs_jsr305",
+ "@maven//:org_apache_tomcat_annotations_api",
+ ],
+)
+
+java_library(
+ name = "tests",
+ srcs = glob(["src/test/java/**/*.java"]),
+ resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"]),
+ visibility = ["//visibility:public"],
+ deps = [
+ ":tieredstore",
+ "//:test_deps",
+ "//common",
+ "@maven//:commons_io_commons_io",
+ ],
+)
+
+GenTestRules(
+ name = "GeneratedTestRules",
+ exclude_tests = [
+ ],
+ medium_tests = [
+ ],
+ test_files = glob(["src/test/java/**/*Test.java"]),
+ deps = [
+ ":tests",
+ ],
+)
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
new file mode 100644
index 000000000..c983e3f08
--- /dev/null
+++ b/tieredstore/pom.xml
@@ -0,0 +1,46 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>5.0.1-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-tiered-store</artifactId>
+ <name>rocketmq-tiered-store ${project.version}</name>
+
+ <properties>
+ <project.root>${basedir}/..</project.root>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-store</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
new file mode 100644
index 000000000..c85317177
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.common;
+
+import java.io.File;
+
+public class TieredMessageStoreConfig {
+ private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+
+ public String getStorePathRootDir() {
+ return storePathRootDir;
+ }
+
+ public void setStorePathRootDir(String storePathRootDir) {
+ this.storePathRootDir = storePathRootDir;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/FileSegmentMetadata.java
new file mode 100644
index 000000000..d31de41bb
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/FileSegmentMetadata.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class FileSegmentMetadata {
+ public static final int STATUS_NEW = 0;
+ public static final int STATUS_SEALED = 1;
+ public static final int STATUS_DELETED = 2;
+
+ private MessageQueue queue;
+ private int status;
+ private int type;
+ private long baseOffset;
+ private String path;
+ private long size;
+ private long createTimestamp;
+ private long beginTimestamp;
+ private long endTimestamp;
+ private long sealTimestamp;
+
+ // default constructor is used by fastjson
+ public FileSegmentMetadata() {
+
+ }
+
+ public FileSegmentMetadata(MessageQueue queue, int type, long baseOffset, String path) {
+ this.queue = queue;
+ this.status = STATUS_NEW;
+ this.type = type;
+ this.baseOffset = baseOffset;
+ this.path = path;
+ this.createTimestamp = System.currentTimeMillis();
+ }
+
+ public MessageQueue getQueue() {
+ return queue;
+ }
+
+ public void setQueue(MessageQueue queue) {
+ this.queue = queue;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public void setType(int type) {
+ this.type = type;
+ }
+
+ public long getBaseOffset() {
+ return baseOffset;
+ }
+
+ public void setBaseOffset(long baseOffset) {
+ this.baseOffset = baseOffset;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public long getCreateTimestamp() {
+ return createTimestamp;
+ }
+
+ public void setCreateTimestamp(long createTimestamp) {
+ this.createTimestamp = createTimestamp;
+ }
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+ public void setBeginTimestamp(long beginTimestamp) {
+ this.beginTimestamp = beginTimestamp;
+ }
+
+ public long getEndTimestamp() {
+ return endTimestamp;
+ }
+
+ public void setEndTimestamp(long endTimestamp) {
+ this.endTimestamp = endTimestamp;
+ }
+
+ public long getSealTimestamp() {
+ return sealTimestamp;
+ }
+
+ public void setSealTimestamp(long sealTimestamp) {
+ this.sealTimestamp = sealTimestamp;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/QueueMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/QueueMetadata.java
new file mode 100644
index 000000000..e156f6fc1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/QueueMetadata.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class QueueMetadata {
+ private MessageQueue queue;
+ private long minOffset;
+ private long maxOffset;
+ private long updateTimestamp;
+
+ // default constructor is used by fastjson
+ public QueueMetadata() {
+
+ }
+
+ public QueueMetadata(MessageQueue queue, long minOffset, long maxOffset) {
+ this.queue = queue;
+ this.minOffset = minOffset;
+ this.maxOffset = maxOffset;
+ this.updateTimestamp = System.currentTimeMillis();
+ }
+
+ public MessageQueue getQueue() {
+ return queue;
+ }
+
+ public void setQueue(MessageQueue queue) {
+ this.queue = queue;
+ }
+
+ public long getMinOffset() {
+ return minOffset;
+ }
+
+ public void setMinOffset(long minOffset) {
+ this.minOffset = minOffset;
+ }
+
+ public long getMaxOffset() {
+ return maxOffset;
+ }
+
+ public void setMaxOffset(long maxOffset) {
+ this.maxOffset = maxOffset;
+ }
+
+ public long getUpdateTimestamp() {
+ return updateTimestamp;
+ }
+
+ public void setUpdateTimestamp(long updateTimestamp) {
+ this.updateTimestamp = updateTimestamp;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
new file mode 100644
index 000000000..e4f241af1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+
+public class TieredStoreMetadataManager extends ConfigManager implements TieredStoreMetadataStore {
+ private final AtomicInteger maxTopicId = new AtomicInteger(0);
+ private final ConcurrentMap<String /*topic*/, TopicMetadata> topicMetadataTable = new ConcurrentHashMap<>(1024);
+ private final ConcurrentMap<String /*topic*/, ConcurrentMap<Integer /*queueId*/, QueueMetadata>> queueMetadataTable = new ConcurrentHashMap<>(1024);
+ private final TieredMessageStoreConfig storeConfig;
+
+ public TieredStoreMetadataManager(TieredMessageStoreConfig storeConfig) {
+ this.storeConfig = storeConfig;
+ }
+ @Override
+ public String encode() {
+ return encode(false);
+ }
+
+ @Override
+ public String encode(boolean prettyFormat) {
+ TieredStoreMetadataSerializeWrapper dataWrapper = new TieredStoreMetadataSerializeWrapper();
+ dataWrapper.setMaxTopicId(maxTopicId);
+ dataWrapper.setTopicMetadataTable(topicMetadataTable);
+ dataWrapper.setQueueMetadataTable(new HashMap<>(queueMetadataTable));
+ return dataWrapper.toJson(false);
+ }
+
+ @Override
+ public String configFilePath() {
+ return storeConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "tieredStoreMetadata.json";
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ TieredStoreMetadataSerializeWrapper dataWrapper =
+ TieredStoreMetadataSerializeWrapper.fromJson(jsonString, TieredStoreMetadataSerializeWrapper.class);
+ if (dataWrapper != null) {
+ maxTopicId.set(dataWrapper.getMaxTopicId().get());
+ topicMetadataTable.putAll(dataWrapper.getTopicMetadataTable());
+ dataWrapper.getQueueMetadataTable()
+ .forEach((topic, map) -> queueMetadataTable.put(topic, new ConcurrentHashMap<>(map)));
+ }
+ }
+ }
+
+ @Override
+ @Nullable
+ public TopicMetadata getTopic(String topic) {
+ return topicMetadataTable.get(topic);
+ }
+
+ @Override
+ public void iterateTopic(Consumer<TopicMetadata> callback) {
+ topicMetadataTable.values().forEach(callback);
+ }
+
+ @Override
+ public TopicMetadata addTopic(String topic, long reserveTime) {
+ TopicMetadata old = getTopic(topic);
+ if (old != null) {
+ return old;
+ }
+ TopicMetadata metadata = new TopicMetadata(maxTopicId.getAndIncrement(), topic, reserveTime);
+ topicMetadataTable.put(topic, metadata);
+ return metadata;
+ }
+
+ @Override
+ public void updateTopicReserveTime(String topic, long reserveTime) {
+ TopicMetadata metadata = getTopic(topic);
+ if (metadata == null) {
+ return;
+ }
+ metadata.setReserveTime(reserveTime);
+ metadata.setUpdateTimestamp(System.currentTimeMillis());
+ }
+
+ @Override
+ public void updateTopicStatus(String topic, int status) {
+ TopicMetadata metadata = getTopic(topic);
+ if (metadata == null) {
+ return;
+ }
+ metadata.setStatus(status);
+ metadata.setUpdateTimestamp(System.currentTimeMillis());
+ }
+
+ @Override
+ public void deleteTopic(String topic) {
+ topicMetadataTable.remove(topic);
+ }
+
+ @Override
+ @Nullable
+ public QueueMetadata getQueue(MessageQueue queue) {
+ if (!queueMetadataTable.containsKey(queue.getTopic())) {
+ return null;
+ }
+ return queueMetadataTable.get(queue.getTopic())
+ .get(queue.getQueueId());
+ }
+
+ @Override
+ public void iterateQueue(String topic, Consumer<QueueMetadata> callback) {
+ queueMetadataTable.get(topic)
+ .values()
+ .forEach(callback);
+ }
+
+ @Override
+ public QueueMetadata addQueue(MessageQueue queue, long baseOffset) {
+ QueueMetadata old = getQueue(queue);
+ if (old != null) {
+ return old;
+ }
+ QueueMetadata metadata = new QueueMetadata(queue, baseOffset, baseOffset);
+ queueMetadataTable.computeIfAbsent(queue.getTopic(), topic -> new ConcurrentHashMap<>())
+ .put(queue.getQueueId(), metadata);
+ return metadata;
+ }
+
+ @Override
+ public void updateQueue(QueueMetadata metadata) {
+ MessageQueue queue = metadata.getQueue();
+ if (queueMetadataTable.containsKey(queue.getTopic())) {
+ ConcurrentMap<Integer, QueueMetadata> metadataMap = queueMetadataTable.get(queue.getTopic());
+ if (metadataMap.containsKey(queue.getQueueId())) {
+ metadata.setUpdateTimestamp(System.currentTimeMillis());
+ metadataMap.put(queue.getQueueId(), metadata);
+ }
+ }
+ }
+
+ @Override
+ public void deleteQueue(MessageQueue queue) {
+ if (queueMetadataTable.containsKey(queue.getTopic())) {
+ queueMetadataTable.get(queue.getTopic())
+ .remove(queue.getQueueId());
+ }
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
new file mode 100644
index 000000000..e4e068aff
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class TieredStoreMetadataSerializeWrapper extends RemotingSerializable {
+ private AtomicInteger maxTopicId;
+ private Map<String /*topic*/, TopicMetadata> topicMetadataTable;
+ private Map<String /*topic*/, Map<Integer /*queueId*/, QueueMetadata>> queueMetadataTable;
+
+
+ public AtomicInteger getMaxTopicId() {
+ return maxTopicId;
+ }
+
+ public void setMaxTopicId(AtomicInteger maxTopicId) {
+ this.maxTopicId = maxTopicId;
+ }
+
+ public Map<String, TopicMetadata> getTopicMetadataTable() {
+ return topicMetadataTable;
+ }
+
+ public void setTopicMetadataTable(
+ Map<String, TopicMetadata> topicMetadataTable) {
+ this.topicMetadataTable = topicMetadataTable;
+ }
+
+ public Map<String, Map<Integer, QueueMetadata>> getQueueMetadataTable() {
+ return queueMetadataTable;
+ }
+
+ public void setQueueMetadataTable(
+ Map<String, Map<Integer, QueueMetadata>> queueMetadataTable) {
+ this.queueMetadataTable = queueMetadataTable;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
new file mode 100644
index 000000000..7701258af
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public interface TieredStoreMetadataStore {
+ @Nullable
+ TopicMetadata getTopic(String topic);
+ void iterateTopic(Consumer<TopicMetadata> callback);
+ TopicMetadata addTopic(String topic, long reserveTime);
+ void updateTopicReserveTime(String topic, long reserveTime);
+ void updateTopicStatus(String topic, int status);
+ void deleteTopic(String topic);
+
+ @Nullable
+ QueueMetadata getQueue(MessageQueue queue);
+ void iterateQueue(String topic, Consumer<QueueMetadata> callback);
+ QueueMetadata addQueue(MessageQueue queue, long baseOffset);
+ void updateQueue(QueueMetadata metadata);
+ void deleteQueue(MessageQueue queue);
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TopicMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TopicMetadata.java
new file mode 100644
index 000000000..37eca400e
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TopicMetadata.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+public class TopicMetadata {
+ private int topicId;
+ String topic;
+ long reserveTime;
+ int status;
+ long updateTimestamp;
+
+ // default constructor is used by fastjson
+ public TopicMetadata() {
+
+ }
+
+ public TopicMetadata(int topicId, String topic, long reserveTime) {
+ this.topicId = topicId;
+ this.topic = topic;
+ this.reserveTime = reserveTime;
+ this.updateTimestamp = System.currentTimeMillis();
+ }
+
+ public int getTopicId() {
+ return topicId;
+ }
+
+ public void setTopicId(int topicId) {
+ this.topicId = topicId;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public long getReserveTime() {
+ return reserveTime;
+ }
+
+ public void setReserveTime(long reserveTime) {
+ this.reserveTime = reserveTime;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public long getUpdateTimestamp() {
+ return updateTimestamp;
+ }
+
+ public void setUpdateTimestamp(long updateTimestamp) {
+ this.updateTimestamp = updateTimestamp;
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
new file mode 100644
index 000000000..ff73c173a
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataStoreTest {
+ MessageQueue mq;
+ TieredMessageStoreConfig storeConfig;
+ TieredStoreMetadataStore metadataStore;
+
+ @Before
+ public void setUp() {
+ mq = new MessageQueue("MetadataStoreTest", "broker", 1);
+ storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setStorePathRootDir("/tmp/rmqut");
+ metadataStore = new TieredStoreMetadataManager(storeConfig);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+ }
+
+ @Test
+ public void testQueue() {
+ QueueMetadata queueMetadata = metadataStore.getQueue(mq);
+ Assert.assertNull(queueMetadata);
+
+ queueMetadata = metadataStore.addQueue(mq, -1);
+ Assert.assertEquals(queueMetadata.getMinOffset(), -1);
+ Assert.assertEquals(queueMetadata.getMaxOffset(), -1);
+
+ long currentTimeMillis = System.currentTimeMillis();
+ queueMetadata.setMinOffset(0);
+ queueMetadata.setMaxOffset(0);
+ metadataStore.updateQueue(queueMetadata);
+ queueMetadata = metadataStore.getQueue(mq);
+ Assert.assertTrue(queueMetadata.getUpdateTimestamp() >= currentTimeMillis);
+ Assert.assertEquals(queueMetadata.getMinOffset(), 0);
+ Assert.assertEquals(queueMetadata.getMaxOffset(), 0);
+
+ MessageQueue mq2 = new MessageQueue("MetadataStoreTest", "broker", 2);
+ metadataStore.addQueue(mq2, 1);
+ AtomicInteger i = new AtomicInteger(0);
+ metadataStore.iterateQueue(mq.getTopic(), metadata -> {
+ Assert.assertEquals(i.get(), metadata.getMinOffset());
+ i.getAndIncrement();
+ });
+ Assert.assertEquals(i.get(), 2);
+
+ metadataStore.deleteQueue(mq);
+ queueMetadata = metadataStore.getQueue(mq);
+ Assert.assertNull(queueMetadata);
+ }
+
+ @Test
+ public void testTopic() {
+ TopicMetadata topicMetadata = metadataStore.getTopic(mq.getTopic());
+ Assert.assertNull(topicMetadata);
+
+ metadataStore.addTopic(mq.getTopic(), 2);
+ topicMetadata = metadataStore.getTopic(mq.getTopic());
+ Assert.assertEquals(mq.getTopic(), topicMetadata.getTopic());
+ Assert.assertEquals(topicMetadata.getStatus(), 0);
+ Assert.assertEquals(topicMetadata.getReserveTime(), 2);
+ Assert.assertEquals(topicMetadata.getTopicId(), 0);
+
+ metadataStore.updateTopicStatus(mq.getTopic(), 1);
+ metadataStore.updateTopicReserveTime(mq.getTopic(), 0);
+ topicMetadata = metadataStore.getTopic(mq.getTopic());
+ Assert.assertNotNull(topicMetadata);
+ Assert.assertEquals(topicMetadata.getStatus(), 1);
+ Assert.assertEquals(topicMetadata.getReserveTime(), 0);
+
+ metadataStore.addTopic(mq.getTopic() + "1", 1);
+ metadataStore.updateTopicStatus(mq.getTopic() + "1", 2);
+
+ metadataStore.addTopic(mq.getTopic() + "2", 2);
+ metadataStore.updateTopicStatus(mq.getTopic() + "2", 3);
+
+ AtomicInteger n = new AtomicInteger();
+ metadataStore.iterateTopic(metadata -> {
+ long i = metadata.getReserveTime();
+ Assert.assertEquals(metadata.getTopicId(), i);
+ Assert.assertEquals(metadata.getStatus(), i + 1);
+ if (i == 2) {
+ metadataStore.deleteTopic(metadata.getTopic());
+ }
+ n.getAndIncrement();
+ });
+ Assert.assertEquals(3, n.get());
+
+ Assert.assertNull(metadataStore.getTopic(mq.getTopic() + "2"));
+
+ Assert.assertNotNull(metadataStore.getTopic(mq.getTopic()));
+ Assert.assertNotNull(metadataStore.getTopic(mq.getTopic() + "1"));
+ }
+
+ @Test
+ public void testReload() {
+ TieredStoreMetadataManager metadataManager = (TieredStoreMetadataManager) metadataStore;
+ metadataManager.addTopic(mq.getTopic(), 1);
+ metadataManager.addQueue(mq, 2);
+ metadataManager.persist();
+ Assert.assertTrue(new File(metadataManager.configFilePath()).exists());
+
+ metadataManager = new TieredStoreMetadataManager(storeConfig);
+ metadataManager.load();
+
+ TopicMetadata topicMetadata = metadataManager.getTopic(mq.getTopic());
+ Assert.assertNotNull(topicMetadata);
+ Assert.assertEquals(topicMetadata.getReserveTime(), 1);
+
+ QueueMetadata queueMetadata = metadataManager.getQueue(mq);
+ Assert.assertNotNull(queueMetadata);
+ Assert.assertEquals(queueMetadata.getMinOffset(), 2);
+ }
+}