You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2023/01/12 11:03:01 UTC

[rocketmq] branch develop updated: [ISSUE #5863] implement tiered store metadata (#5864)

This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cbad53d60 [ISSUE #5863] implement tiered store metadata (#5864)
cbad53d60 is described below

commit cbad53d60d7a766d5ca290d45d28ed648e1a34b6
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Thu Jan 12 19:02:40 2023 +0800

    [ISSUE #5863] implement tiered store metadata (#5864)
    
    * implement tiered store metadata
    
    * fix bazel
---
 pom.xml                                            |   1 +
 tieredstore/BUILD.bazel                            |  55 +++++++
 tieredstore/pom.xml                                |  46 ++++++
 .../tiered/common/TieredMessageStoreConfig.java    |  31 ++++
 .../store/tiered/metadata/FileSegmentMetadata.java | 130 ++++++++++++++++
 .../store/tiered/metadata/QueueMetadata.java       |  70 +++++++++
 .../metadata/TieredStoreMetadataManager.java       | 167 +++++++++++++++++++++
 .../TieredStoreMetadataSerializeWrapper.java       |  54 +++++++
 .../tiered/metadata/TieredStoreMetadataStore.java  |  38 +++++
 .../store/tiered/metadata/TopicMetadata.java       |  77 ++++++++++
 .../store/tiered/metadata/MetadataStoreTest.java   | 142 ++++++++++++++++++
 11 files changed, 811 insertions(+)

diff --git a/pom.xml b/pom.xml
index 040f8c5b6..9400633b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,7 @@
         <module>container</module>
         <module>controller</module>
         <module>proxy</module>
+        <module>tieredstore</module>
     </modules>
 
     <build>
diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel
new file mode 100644
index 000000000..5a0176e89
--- /dev/null
+++ b/tieredstore/BUILD.bazel
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+load("//bazel:GenTestRules.bzl", "GenTestRules")
+
+java_library(
+    name = "tieredstore",
+    srcs = glob(["src/main/java/**/*.java"]),
+    visibility = ["//visibility:public"],
+    deps = [
+        "//common",
+        "//remoting",
+        "//store",
+        "@maven//:com_google_code_findbugs_jsr305",
+        "@maven//:org_apache_tomcat_annotations_api",
+    ],
+)
+
+java_library(
+    name = "tests",
+    srcs = glob(["src/test/java/**/*.java"]),
+    resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"]),
+    visibility = ["//visibility:public"],
+    deps = [
+        ":tieredstore",
+        "//:test_deps",
+        "//common",
+        "@maven//:commons_io_commons_io",
+    ],
+)
+
+GenTestRules(
+    name = "GeneratedTestRules",
+    exclude_tests = [
+    ],
+    medium_tests = [
+    ],
+    test_files = glob(["src/test/java/**/*Test.java"]),
+    deps = [
+        ":tests",
+    ],
+)
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
new file mode 100644
index 000000000..c983e3f08
--- /dev/null
+++ b/tieredstore/pom.xml
@@ -0,0 +1,46 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>5.0.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-tiered-store</artifactId>
+    <name>rocketmq-tiered-store ${project.version}</name>
+
+    <properties>
+        <project.root>${basedir}/..</project.root>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-store</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
new file mode 100644
index 000000000..c85317177
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.common;
+
+import java.io.File;
+
+public class TieredMessageStoreConfig {
+    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+
+    public String getStorePathRootDir() {
+        return storePathRootDir;
+    }
+
+    public void setStorePathRootDir(String storePathRootDir) {
+        this.storePathRootDir = storePathRootDir;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/FileSegmentMetadata.java
new file mode 100644
index 000000000..d31de41bb
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/FileSegmentMetadata.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class FileSegmentMetadata {
+    public static final int STATUS_NEW = 0;
+    public static final int STATUS_SEALED = 1;
+    public static final int STATUS_DELETED = 2;
+
+    private MessageQueue queue;
+    private int status;
+    private int type;
+    private long baseOffset;
+    private String path;
+    private long size;
+    private long createTimestamp;
+    private long beginTimestamp;
+    private long endTimestamp;
+    private long sealTimestamp;
+
+    // default constructor is used by fastjson
+    public FileSegmentMetadata() {
+
+    }
+
+    public FileSegmentMetadata(MessageQueue queue, int type, long baseOffset, String path) {
+        this.queue = queue;
+        this.status = STATUS_NEW;
+        this.type = type;
+        this.baseOffset = baseOffset;
+        this.path = path;
+        this.createTimestamp = System.currentTimeMillis();
+    }
+
+    public MessageQueue getQueue() {
+        return queue;
+    }
+
+    public void setQueue(MessageQueue queue) {
+        this.queue = queue;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public int getType() {
+        return type;
+    }
+
+    public void setType(int type) {
+        this.type = type;
+    }
+
+    public long getBaseOffset() {
+        return baseOffset;
+    }
+
+    public void setBaseOffset(long baseOffset) {
+        this.baseOffset = baseOffset;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public void setSize(long size) {
+        this.size = size;
+    }
+
+    public long getCreateTimestamp() {
+        return createTimestamp;
+    }
+
+    public void setCreateTimestamp(long createTimestamp) {
+        this.createTimestamp = createTimestamp;
+    }
+
+    public long getBeginTimestamp() {
+        return beginTimestamp;
+    }
+
+    public void setBeginTimestamp(long beginTimestamp) {
+        this.beginTimestamp = beginTimestamp;
+    }
+
+    public long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    public void setEndTimestamp(long endTimestamp) {
+        this.endTimestamp = endTimestamp;
+    }
+
+    public long getSealTimestamp() {
+        return sealTimestamp;
+    }
+
+    public void setSealTimestamp(long sealTimestamp) {
+        this.sealTimestamp = sealTimestamp;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/QueueMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/QueueMetadata.java
new file mode 100644
index 000000000..e156f6fc1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/QueueMetadata.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class QueueMetadata {
+    private MessageQueue queue;
+    private long minOffset;
+    private long maxOffset;
+    private long updateTimestamp;
+
+    // default constructor is used by fastjson
+    public QueueMetadata() {
+
+    }
+
+    public QueueMetadata(MessageQueue queue, long minOffset, long maxOffset) {
+        this.queue = queue;
+        this.minOffset = minOffset;
+        this.maxOffset = maxOffset;
+        this.updateTimestamp = System.currentTimeMillis();
+    }
+
+    public MessageQueue getQueue() {
+        return queue;
+    }
+
+    public void setQueue(MessageQueue queue) {
+        this.queue = queue;
+    }
+
+    public long getMinOffset() {
+        return minOffset;
+    }
+
+    public void setMinOffset(long minOffset) {
+        this.minOffset = minOffset;
+    }
+
+    public long getMaxOffset() {
+        return maxOffset;
+    }
+
+    public void setMaxOffset(long maxOffset) {
+        this.maxOffset = maxOffset;
+    }
+
+    public long getUpdateTimestamp() {
+        return updateTimestamp;
+    }
+
+    public void setUpdateTimestamp(long updateTimestamp) {
+        this.updateTimestamp = updateTimestamp;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
new file mode 100644
index 000000000..e4f241af1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+
+public class TieredStoreMetadataManager extends ConfigManager implements TieredStoreMetadataStore {
+    private final AtomicInteger maxTopicId = new AtomicInteger(0);
+    private final ConcurrentMap<String /*topic*/, TopicMetadata> topicMetadataTable = new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<String /*topic*/, ConcurrentMap<Integer /*queueId*/, QueueMetadata>> queueMetadataTable = new ConcurrentHashMap<>(1024);
+    private final TieredMessageStoreConfig storeConfig;
+
+    public TieredStoreMetadataManager(TieredMessageStoreConfig storeConfig) {
+        this.storeConfig = storeConfig;
+    }
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String encode(boolean prettyFormat) {
+        TieredStoreMetadataSerializeWrapper dataWrapper = new TieredStoreMetadataSerializeWrapper();
+        dataWrapper.setMaxTopicId(maxTopicId);
+        dataWrapper.setTopicMetadataTable(topicMetadataTable);
+        dataWrapper.setQueueMetadataTable(new HashMap<>(queueMetadataTable));
+        return dataWrapper.toJson(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        return storeConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "tieredStoreMetadata.json";
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            TieredStoreMetadataSerializeWrapper dataWrapper =
+                TieredStoreMetadataSerializeWrapper.fromJson(jsonString, TieredStoreMetadataSerializeWrapper.class);
+            if (dataWrapper != null) {
+                maxTopicId.set(dataWrapper.getMaxTopicId().get());
+                topicMetadataTable.putAll(dataWrapper.getTopicMetadataTable());
+                dataWrapper.getQueueMetadataTable()
+                    .forEach((topic, map) -> queueMetadataTable.put(topic, new ConcurrentHashMap<>(map)));
+            }
+        }
+    }
+
+    @Override
+    @Nullable
+    public TopicMetadata getTopic(String topic) {
+        return topicMetadataTable.get(topic);
+    }
+
+    @Override
+    public void iterateTopic(Consumer<TopicMetadata> callback) {
+        topicMetadataTable.values().forEach(callback);
+    }
+
+    @Override
+    public TopicMetadata addTopic(String topic, long reserveTime) {
+        TopicMetadata old = getTopic(topic);
+        if (old != null) {
+            return old;
+        }
+        TopicMetadata metadata = new TopicMetadata(maxTopicId.getAndIncrement(), topic, reserveTime);
+        topicMetadataTable.put(topic, metadata);
+        return metadata;
+    }
+
+    @Override
+    public void updateTopicReserveTime(String topic, long reserveTime) {
+        TopicMetadata metadata = getTopic(topic);
+        if (metadata == null) {
+            return;
+        }
+        metadata.setReserveTime(reserveTime);
+        metadata.setUpdateTimestamp(System.currentTimeMillis());
+    }
+
+    @Override
+    public void updateTopicStatus(String topic, int status) {
+        TopicMetadata metadata = getTopic(topic);
+        if (metadata == null) {
+            return;
+        }
+        metadata.setStatus(status);
+        metadata.setUpdateTimestamp(System.currentTimeMillis());
+    }
+
+    @Override
+    public void deleteTopic(String topic) {
+        topicMetadataTable.remove(topic);
+    }
+
+    @Override
+    @Nullable
+    public QueueMetadata getQueue(MessageQueue queue) {
+        if (!queueMetadataTable.containsKey(queue.getTopic())) {
+            return null;
+        }
+        return queueMetadataTable.get(queue.getTopic())
+            .get(queue.getQueueId());
+    }
+
+    @Override
+    public void iterateQueue(String topic, Consumer<QueueMetadata> callback) {
+        queueMetadataTable.get(topic)
+            .values()
+            .forEach(callback);
+    }
+
+    @Override
+    public QueueMetadata addQueue(MessageQueue queue, long baseOffset) {
+        QueueMetadata old = getQueue(queue);
+        if (old != null) {
+            return old;
+        }
+        QueueMetadata metadata = new QueueMetadata(queue, baseOffset, baseOffset);
+        queueMetadataTable.computeIfAbsent(queue.getTopic(), topic -> new ConcurrentHashMap<>())
+            .put(queue.getQueueId(), metadata);
+        return metadata;
+    }
+
+    @Override
+    public void updateQueue(QueueMetadata metadata) {
+        MessageQueue queue = metadata.getQueue();
+        if (queueMetadataTable.containsKey(queue.getTopic())) {
+            ConcurrentMap<Integer, QueueMetadata> metadataMap = queueMetadataTable.get(queue.getTopic());
+            if (metadataMap.containsKey(queue.getQueueId())) {
+                metadata.setUpdateTimestamp(System.currentTimeMillis());
+                metadataMap.put(queue.getQueueId(), metadata);
+            }
+        }
+    }
+
+    @Override
+    public void deleteQueue(MessageQueue queue) {
+        if (queueMetadataTable.containsKey(queue.getTopic())) {
+            queueMetadataTable.get(queue.getTopic())
+                .remove(queue.getQueueId());
+        }
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
new file mode 100644
index 000000000..e4e068aff
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class TieredStoreMetadataSerializeWrapper extends RemotingSerializable {
+    private AtomicInteger maxTopicId;
+    private Map<String /*topic*/, TopicMetadata> topicMetadataTable;
+    private Map<String /*topic*/, Map<Integer /*queueId*/, QueueMetadata>> queueMetadataTable;
+
+
+    public AtomicInteger getMaxTopicId() {
+        return maxTopicId;
+    }
+
+    public void setMaxTopicId(AtomicInteger maxTopicId) {
+        this.maxTopicId = maxTopicId;
+    }
+
+    public Map<String, TopicMetadata> getTopicMetadataTable() {
+        return topicMetadataTable;
+    }
+
+    public void setTopicMetadataTable(
+        Map<String, TopicMetadata> topicMetadataTable) {
+        this.topicMetadataTable = topicMetadataTable;
+    }
+
+    public Map<String, Map<Integer, QueueMetadata>> getQueueMetadataTable() {
+        return queueMetadataTable;
+    }
+
+    public void setQueueMetadataTable(
+        Map<String, Map<Integer, QueueMetadata>> queueMetadataTable) {
+        this.queueMetadataTable = queueMetadataTable;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
new file mode 100644
index 000000000..7701258af
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public interface TieredStoreMetadataStore {
+    @Nullable
+    TopicMetadata getTopic(String topic);
+    void iterateTopic(Consumer<TopicMetadata> callback);
+    TopicMetadata addTopic(String topic, long reserveTime);
+    void updateTopicReserveTime(String topic, long reserveTime);
+    void updateTopicStatus(String topic, int status);
+    void deleteTopic(String topic);
+
+    @Nullable
+    QueueMetadata getQueue(MessageQueue queue);
+    void iterateQueue(String topic, Consumer<QueueMetadata> callback);
+    QueueMetadata addQueue(MessageQueue queue, long baseOffset);
+    void updateQueue(QueueMetadata metadata);
+    void deleteQueue(MessageQueue queue);
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TopicMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TopicMetadata.java
new file mode 100644
index 000000000..37eca400e
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TopicMetadata.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+public class TopicMetadata {
+    private int topicId;
+    String topic;
+    long reserveTime;
+    int status;
+    long updateTimestamp;
+
+    // default constructor is used by fastjson
+    public TopicMetadata() {
+
+    }
+
+    public TopicMetadata(int topicId, String topic, long reserveTime) {
+        this.topicId = topicId;
+        this.topic = topic;
+        this.reserveTime = reserveTime;
+        this.updateTimestamp = System.currentTimeMillis();
+    }
+
+    public int getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(int topicId) {
+        this.topicId = topicId;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public long getReserveTime() {
+        return reserveTime;
+    }
+
+    public void setReserveTime(long reserveTime) {
+        this.reserveTime = reserveTime;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public long getUpdateTimestamp() {
+        return updateTimestamp;
+    }
+
+    public void setUpdateTimestamp(long updateTimestamp) {
+        this.updateTimestamp = updateTimestamp;
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
new file mode 100644
index 000000000..ff73c173a
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataStoreTest {
+    MessageQueue mq;
+    TieredMessageStoreConfig storeConfig;
+    TieredStoreMetadataStore metadataStore;
+
+    @Before
+    public void setUp() {
+        mq = new MessageQueue("MetadataStoreTest", "broker", 1);
+        storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setStorePathRootDir("/tmp/rmqut");
+        metadataStore = new TieredStoreMetadataManager(storeConfig);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+    }
+
+    @Test
+    public void testQueue() {
+        QueueMetadata queueMetadata = metadataStore.getQueue(mq);
+        Assert.assertNull(queueMetadata);
+
+        queueMetadata = metadataStore.addQueue(mq, -1);
+        Assert.assertEquals(queueMetadata.getMinOffset(), -1);
+        Assert.assertEquals(queueMetadata.getMaxOffset(), -1);
+
+        long currentTimeMillis = System.currentTimeMillis();
+        queueMetadata.setMinOffset(0);
+        queueMetadata.setMaxOffset(0);
+        metadataStore.updateQueue(queueMetadata);
+        queueMetadata = metadataStore.getQueue(mq);
+        Assert.assertTrue(queueMetadata.getUpdateTimestamp() >= currentTimeMillis);
+        Assert.assertEquals(queueMetadata.getMinOffset(), 0);
+        Assert.assertEquals(queueMetadata.getMaxOffset(), 0);
+
+        MessageQueue mq2 = new MessageQueue("MetadataStoreTest", "broker", 2);
+        metadataStore.addQueue(mq2, 1);
+        AtomicInteger i = new AtomicInteger(0);
+        metadataStore.iterateQueue(mq.getTopic(), metadata -> {
+            Assert.assertEquals(i.get(), metadata.getMinOffset());
+            i.getAndIncrement();
+        });
+        Assert.assertEquals(i.get(), 2);
+
+        metadataStore.deleteQueue(mq);
+        queueMetadata = metadataStore.getQueue(mq);
+        Assert.assertNull(queueMetadata);
+    }
+
+    @Test
+    public void testTopic() {
+        TopicMetadata topicMetadata = metadataStore.getTopic(mq.getTopic());
+        Assert.assertNull(topicMetadata);
+
+        metadataStore.addTopic(mq.getTopic(), 2);
+        topicMetadata = metadataStore.getTopic(mq.getTopic());
+        Assert.assertEquals(mq.getTopic(), topicMetadata.getTopic());
+        Assert.assertEquals(topicMetadata.getStatus(), 0);
+        Assert.assertEquals(topicMetadata.getReserveTime(), 2);
+        Assert.assertEquals(topicMetadata.getTopicId(), 0);
+
+        metadataStore.updateTopicStatus(mq.getTopic(), 1);
+        metadataStore.updateTopicReserveTime(mq.getTopic(), 0);
+        topicMetadata = metadataStore.getTopic(mq.getTopic());
+        Assert.assertNotNull(topicMetadata);
+        Assert.assertEquals(topicMetadata.getStatus(), 1);
+        Assert.assertEquals(topicMetadata.getReserveTime(), 0);
+
+        metadataStore.addTopic(mq.getTopic() + "1", 1);
+        metadataStore.updateTopicStatus(mq.getTopic() + "1", 2);
+
+        metadataStore.addTopic(mq.getTopic() + "2", 2);
+        metadataStore.updateTopicStatus(mq.getTopic() + "2", 3);
+
+        AtomicInteger n = new AtomicInteger();
+        metadataStore.iterateTopic(metadata -> {
+            long i = metadata.getReserveTime();
+            Assert.assertEquals(metadata.getTopicId(), i);
+            Assert.assertEquals(metadata.getStatus(), i + 1);
+            if (i == 2) {
+                metadataStore.deleteTopic(metadata.getTopic());
+            }
+            n.getAndIncrement();
+        });
+        Assert.assertEquals(3, n.get());
+
+        Assert.assertNull(metadataStore.getTopic(mq.getTopic() + "2"));
+
+        Assert.assertNotNull(metadataStore.getTopic(mq.getTopic()));
+        Assert.assertNotNull(metadataStore.getTopic(mq.getTopic() + "1"));
+    }
+
+    @Test
+    public void testReload() {
+        TieredStoreMetadataManager metadataManager = (TieredStoreMetadataManager) metadataStore;
+        metadataManager.addTopic(mq.getTopic(), 1);
+        metadataManager.addQueue(mq, 2);
+        metadataManager.persist();
+        Assert.assertTrue(new File(metadataManager.configFilePath()).exists());
+
+        metadataManager = new TieredStoreMetadataManager(storeConfig);
+        metadataManager.load();
+
+        TopicMetadata topicMetadata = metadataManager.getTopic(mq.getTopic());
+        Assert.assertNotNull(topicMetadata);
+        Assert.assertEquals(topicMetadata.getReserveTime(), 1);
+
+        QueueMetadata queueMetadata = metadataManager.getQueue(mq);
+        Assert.assertNotNull(queueMetadata);
+        Assert.assertEquals(queueMetadata.getMinOffset(), 2);
+    }
+}