You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2023/01/16 09:59:30 UTC
[rocketmq] branch develop updated: [ISSUE #5874] implement file queue for tiered storage (#5875)
This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 970021128 [ISSUE #5874] implement file queue for tiered storage (#5875)
970021128 is described below
commit 9700211285d65b33e82409c086aed5a31fdf97ca
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Mon Jan 16 17:59:09 2023 +0800
[ISSUE #5874] implement file queue for tiered storage (#5875)
* implement file queue for tiered storage
* fix bazel
* suppress error found by spotbugs
* fix tests in windows
* fixed according to comment
* fix warning in bazel build
---
style/spotbugs-suppressions.xml | 5 +
tieredstore/BUILD.bazel | 6 +
tieredstore/pom.xml | 6 +
...edMessageStoreConfig.java => AppendResult.java} | 20 +-
...edMessageStoreConfig.java => BoundaryType.java} | 29 +-
.../tiered/common/TieredMessageStoreConfig.java | 293 +++++++++++
.../store/tiered/common/TieredStoreExecutor.java | 93 ++++
.../store/tiered/container/TieredCommitLog.java | 119 +++++
.../store/tiered/container/TieredConsumeQueue.java | 111 +++++
.../store/tiered/container/TieredFileQueue.java | 519 ++++++++++++++++++++
.../store/tiered/container/TieredFileSegment.java | 538 +++++++++++++++++++++
.../store/tiered/container/TieredIndexFile.java | 427 ++++++++++++++++
.../TieredStoreErrorCode.java} | 22 +-
.../tiered/exception/TieredStoreException.java | 63 +++
.../tiered/metadata/TieredMetadataManager.java | 321 ++++++++++++
...er.java => TieredMetadataSerializeWrapper.java} | 2 +-
...MetadataStore.java => TieredMetadataStore.java} | 48 +-
.../metadata/TieredStoreMetadataManager.java | 167 -------
.../CQItemBufferUtil.java} | 18 +-
.../store/tiered/util/MessageBufferUtil.java | 165 +++++++
.../store/tiered/util/TieredStoreUtil.java | 157 ++++++
.../tiered/container/TieredFileQueueTest.java | 233 +++++++++
.../tiered/container/TieredFileSegmentTest.java | 153 ++++++
.../tiered/container/TieredIndexFileTest.java | 130 +++++
.../store/tiered/metadata/MetadataStoreTest.java | 58 ++-
.../store/tiered/mock/MemoryFileSegment.java | 114 +++++
.../store/tiered/util/CQItemBufferUtilTest.java | 51 ++
.../store/tiered/util/MessageBufferUtilTest.java | 243 ++++++++++
.../store/tiered/util/TieredStoreUtilTest.java | 59 +++
tieredstore/src/test/resources/logback.xml | 29 ++
30 files changed, 3984 insertions(+), 215 deletions(-)
diff --git a/style/spotbugs-suppressions.xml b/style/spotbugs-suppressions.xml
index 607080cfb..fa6d85508 100644
--- a/style/spotbugs-suppressions.xml
+++ b/style/spotbugs-suppressions.xml
@@ -30,6 +30,11 @@
<Method name="indexKeyHashMethod" />
<Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
</Match>
+ <Match>
+ <Class name="org.apache.rocketmq.store.tiered.container.TieredIndexFile"/>
+ <Method name="indexKeyHashMethod" />
+ <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
+ </Match>
<Match>
<Class name="org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener"/>
<Method name="toMessageExtBrokerInner" />
diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel
index 5a0176e89..fb9fadfdd 100644
--- a/tieredstore/BUILD.bazel
+++ b/tieredstore/BUILD.bazel
@@ -25,6 +25,10 @@ java_library(
"//remoting",
"//store",
"@maven//:com_google_code_findbugs_jsr305",
+ "@maven//:com_google_guava_guava",
+ "@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
+ "@maven//:io_github_aliyunmq_rocketmq_logback_classic",
+ "@maven//:org_apache_commons_commons_lang3",
"@maven//:org_apache_tomcat_annotations_api",
],
)
@@ -38,7 +42,9 @@ java_library(
":tieredstore",
"//:test_deps",
"//common",
+ "//store",
"@maven//:commons_io_commons_io",
+ "@maven//:org_apache_commons_commons_lang3",
],
)
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index c983e3f08..ff7435ce0 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -37,6 +37,12 @@
<artifactId>rocketmq-store</artifactId>
</dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/AppendResult.java
similarity index 70%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/AppendResult.java
index c85317177..0dd5e9a8e 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/AppendResult.java
@@ -16,16 +16,12 @@
*/
package org.apache.rocketmq.store.tiered.common;
-import java.io.File;
-
-public class TieredMessageStoreConfig {
- private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
-
- public String getStorePathRootDir() {
- return storePathRootDir;
- }
-
- public void setStorePathRootDir(String storePathRootDir) {
- this.storePathRootDir = storePathRootDir;
- }
+public enum AppendResult {
+ SUCCESS,
+ OFFSET_INCORRECT,
+ BUFFER_FULL,
+ FILE_FULL,
+ IO_ERROR,
+ FILE_CLOSE,
+ UNKNOWN_ERROR
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/BoundaryType.java
similarity index 62%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/BoundaryType.java
index c85317177..0e78f1211 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/BoundaryType.java
@@ -16,16 +16,31 @@
*/
package org.apache.rocketmq.store.tiered.common;
-import java.io.File;
+public enum BoundaryType {
+ /**
+ * Indicate that lower boundary is expected.
+ */
+ LOWER("lower"),
-public class TieredMessageStoreConfig {
- private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+ /**
+ * Indicate that upper boundary is expected.
+ */
+ UPPER("upper");
- public String getStorePathRootDir() {
- return storePathRootDir;
+ private String name;
+
+ BoundaryType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
}
- public void setStorePathRootDir(String storePathRootDir) {
- this.storePathRootDir = storePathRootDir;
+ public static BoundaryType getType(String name) {
+ if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) {
+ return UPPER;
+ }
+ return LOWER;
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
index c85317177..f7712c41d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
@@ -17,9 +17,126 @@
package org.apache.rocketmq.store.tiered.common;
import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
public class TieredMessageStoreConfig {
+ private String brokerName = localHostName();
+ private String brokerClusterName = "DefaultCluster";
+ private TieredStorageLevel tieredStorageLevel = TieredStorageLevel.NOT_IN_DISK;
+ public enum TieredStorageLevel {
+ DISABLE(0),
+ NOT_IN_DISK(1),
+ NOT_IN_MEM(2),
+ FORCE(3);
+
+ private final int value;
+
+ TieredStorageLevel(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static TieredStorageLevel valueOf(int value) {
+ switch (value) {
+ case 1:
+ return NOT_IN_DISK;
+ case 2:
+ return NOT_IN_MEM;
+ case 3:
+ return FORCE;
+ default:
+ return DISABLE;
+ }
+ }
+
+ public boolean isEnable() {
+ return this.value > 0;
+ }
+
+ public boolean check(TieredStorageLevel targetLevel) {
+ return this.value >= targetLevel.value;
+ }
+ }
+
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+ private boolean messageIndexEnable = true;
+
+ // CommitLog file size, default is 1G
+ private long tieredStoreCommitLogMaxSize = 1024 * 1024 * 1024;
+ // ConsumeQueue file size, default is 100M
+ private long tieredStoreConsumeQueueMaxSize = 100 * 1024 * 1024;
+ private int tieredStoreIndexFileMaxHashSlotNum = 5000000;
+ private int tieredStoreIndexFileMaxIndexNum = 5000000 * 4;
+ // index file will force rolling to next file after idle specified time, default is 3h
+ private int tieredStoreIndexFileRollingIdleInterval = 3 * 60 * 60 * 1000;
+ private String tieredMetadataServiceProvider = "org.apache.rocketmq.store.tiered.metadata.TieredMetadataManager";
+ private String tieredBackendServiceProvider = "";
+ // file reserved time, default is 72 hour
+ private int tieredStoreFileReservedTime = 72;
+ // time of forcing commitLog to roll to next file, default is 24 hour
+ private int commitLogRollingInterval = 24;
+ // rolling will only happen if file segment size is larger than commitLogRollingMinimumSize, default is 128M
+ private int commitLogRollingMinimumSize = 128 * 1024 * 1024;
+ // default is 100, unit is millisecond
+ private int maxCommitJitter = 100;
+ // Cached message count larger than this value will trigger async commit. default is 1000
+ private int tieredStoreGroupCommitCount = 2500;
+ // Cached message size larger than this value will trigger async commit. default is 32M
+ private int tieredStoreGroupCommitSize = 32 * 1024 * 1024;
+ // Cached message count larger than this value will suspend append. default is 2000
+ private int tieredStoreMaxGroupCommitCount = 10000;
+ private int readAheadMinFactor = 2;
+ private int readAheadMaxFactor = 24;
+ private int readAheadBatchSizeFactorThreshold = 8;
+ private int readAheadMessageCountThreshold = 2048;
+ private int readAheadMessageSizeThreshold = 128 * 1024 * 1024;
+ private long readAheadCacheExpireDuration = 10 * 1000;
+ private double readAheadCacheSizeThresholdRate = 0.3;
+
+ public static String localHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException ignore) {
+ }
+
+ return "DEFAULT_BROKER";
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public String getBrokerClusterName() {
+ return brokerClusterName;
+ }
+
+ public void setBrokerClusterName(String brokerClusterName) {
+ this.brokerClusterName = brokerClusterName;
+ }
+
+ public TieredStorageLevel getTieredStorageLevel() {
+ return tieredStorageLevel;
+ }
+
+ public void setTieredStorageLevel(TieredStorageLevel tieredStorageLevel) {
+ this.tieredStorageLevel = tieredStorageLevel;
+ }
+
+ public void setTieredStorageLevel(int tieredStorageLevel) {
+ this.tieredStorageLevel = TieredStorageLevel.valueOf(tieredStorageLevel);
+ }
+
+ public void setTieredStorageLevel(String tieredStorageLevel) {
+ this.tieredStorageLevel = TieredStorageLevel.valueOf(tieredStorageLevel);
+ }
public String getStorePathRootDir() {
return storePathRootDir;
@@ -28,4 +145,180 @@ public class TieredMessageStoreConfig {
public void setStorePathRootDir(String storePathRootDir) {
this.storePathRootDir = storePathRootDir;
}
+
+ public boolean isMessageIndexEnable() {
+ return messageIndexEnable;
+ }
+
+ public void setMessageIndexEnable(boolean messageIndexEnable) {
+ this.messageIndexEnable = messageIndexEnable;
+ }
+
+ public long getTieredStoreCommitLogMaxSize() {
+ return tieredStoreCommitLogMaxSize;
+ }
+
+ public void setTieredStoreCommitLogMaxSize(long tieredStoreCommitLogMaxSize) {
+ this.tieredStoreCommitLogMaxSize = tieredStoreCommitLogMaxSize;
+ }
+
+ public long getTieredStoreConsumeQueueMaxSize() {
+ return tieredStoreConsumeQueueMaxSize;
+ }
+
+ public void setTieredStoreConsumeQueueMaxSize(long tieredStoreConsumeQueueMaxSize) {
+ this.tieredStoreConsumeQueueMaxSize = tieredStoreConsumeQueueMaxSize;
+ }
+
+ public int getTieredStoreIndexFileMaxHashSlotNum() {
+ return tieredStoreIndexFileMaxHashSlotNum;
+ }
+
+ public void setTieredStoreIndexFileMaxHashSlotNum(int tieredStoreIndexFileMaxHashSlotNum) {
+ this.tieredStoreIndexFileMaxHashSlotNum = tieredStoreIndexFileMaxHashSlotNum;
+ }
+
+ public int getTieredStoreIndexFileMaxIndexNum() {
+ return tieredStoreIndexFileMaxIndexNum;
+ }
+
+ public void setTieredStoreIndexFileMaxIndexNum(int tieredStoreIndexFileMaxIndexNum) {
+ this.tieredStoreIndexFileMaxIndexNum = tieredStoreIndexFileMaxIndexNum;
+ }
+
+ public int getTieredStoreIndexFileRollingIdleInterval() {
+ return tieredStoreIndexFileRollingIdleInterval;
+ }
+
+ public void setTieredStoreIndexFileRollingIdleInterval(int tieredStoreIndexFileRollingIdleInterval) {
+ this.tieredStoreIndexFileRollingIdleInterval = tieredStoreIndexFileRollingIdleInterval;
+ }
+
+ public String getTieredMetadataServiceProvider() {
+ return tieredMetadataServiceProvider;
+ }
+
+ public void setTieredMetadataServiceProvider(String tieredMetadataServiceProvider) {
+ this.tieredMetadataServiceProvider = tieredMetadataServiceProvider;
+ }
+
+ public String getTieredBackendServiceProvider() {
+ return tieredBackendServiceProvider;
+ }
+
+ public void setTieredBackendServiceProvider(String tieredBackendServiceProvider) {
+ this.tieredBackendServiceProvider = tieredBackendServiceProvider;
+ }
+
+ public int getTieredStoreFileReservedTime() {
+ return tieredStoreFileReservedTime;
+ }
+
+ public void setTieredStoreFileReservedTime(int tieredStoreFileReservedTime) {
+ this.tieredStoreFileReservedTime = tieredStoreFileReservedTime;
+ }
+
+ public int getCommitLogRollingInterval() {
+ return commitLogRollingInterval;
+ }
+
+ public void setCommitLogRollingInterval(int commitLogRollingInterval) {
+ this.commitLogRollingInterval = commitLogRollingInterval;
+ }
+
+ public int getCommitLogRollingMinimumSize() {
+ return commitLogRollingMinimumSize;
+ }
+
+ public void setCommitLogRollingMinimumSize(int commitLogRollingMinimumSize) {
+ this.commitLogRollingMinimumSize = commitLogRollingMinimumSize;
+ }
+
+ public int getMaxCommitJitter() {
+ return maxCommitJitter;
+ }
+
+ public void setMaxCommitJitter(int maxCommitJitter) {
+ this.maxCommitJitter = maxCommitJitter;
+ }
+
+ public int getTieredStoreGroupCommitCount() {
+ return tieredStoreGroupCommitCount;
+ }
+
+ public void setTieredStoreGroupCommitCount(int tieredStoreGroupCommitCount) {
+ this.tieredStoreGroupCommitCount = tieredStoreGroupCommitCount;
+ }
+
+ public int getTieredStoreGroupCommitSize() {
+ return tieredStoreGroupCommitSize;
+ }
+
+ public void setTieredStoreGroupCommitSize(int tieredStoreGroupCommitSize) {
+ this.tieredStoreGroupCommitSize = tieredStoreGroupCommitSize;
+ }
+
+ public int getTieredStoreMaxGroupCommitCount() {
+ return tieredStoreMaxGroupCommitCount;
+ }
+
+ public void setTieredStoreMaxGroupCommitCount(int tieredStoreMaxGroupCommitCount) {
+ this.tieredStoreMaxGroupCommitCount = tieredStoreMaxGroupCommitCount;
+ }
+
+ public int getReadAheadMinFactor() {
+ return readAheadMinFactor;
+ }
+
+ public void setReadAheadMinFactor(int readAheadMinFactor) {
+ this.readAheadMinFactor = readAheadMinFactor;
+ }
+
+ public int getReadAheadMaxFactor() {
+ return readAheadMaxFactor;
+ }
+
+ public int getReadAheadBatchSizeFactorThreshold() {
+ return readAheadBatchSizeFactorThreshold;
+ }
+
+ public void setReadAheadBatchSizeFactorThreshold(int readAheadBatchSizeFactorThreshold) {
+ this.readAheadBatchSizeFactorThreshold = readAheadBatchSizeFactorThreshold;
+ }
+
+ public void setReadAheadMaxFactor(int readAheadMaxFactor) {
+ this.readAheadMaxFactor = readAheadMaxFactor;
+ }
+
+ public int getReadAheadMessageCountThreshold() {
+ return readAheadMessageCountThreshold;
+ }
+
+ public void setReadAheadMessageCountThreshold(int readAheadMessageCountThreshold) {
+ this.readAheadMessageCountThreshold = readAheadMessageCountThreshold;
+ }
+
+ public int getReadAheadMessageSizeThreshold() {
+ return readAheadMessageSizeThreshold;
+ }
+
+ public void setReadAheadMessageSizeThreshold(int readAheadMessageSizeThreshold) {
+ this.readAheadMessageSizeThreshold = readAheadMessageSizeThreshold;
+ }
+
+ public long getReadAheadCacheExpireDuration() {
+ return readAheadCacheExpireDuration;
+ }
+
+ public void setReadAheadCacheExpireDuration(long duration) {
+ this.readAheadCacheExpireDuration = duration;
+ }
+
+ public double getReadAheadCacheSizeThresholdRate() {
+ return readAheadCacheSizeThresholdRate;
+ }
+
+ public void setReadAheadCacheSizeThresholdRate(double rate) {
+ this.readAheadCacheSizeThresholdRate = rate;
+ }
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredStoreExecutor.java
new file mode 100644
index 000000000..3c56b3dc1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredStoreExecutor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.common;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+
+public class TieredStoreExecutor {
+ private static final int QUEUE_CAPACITY = 10000;
+ private static final BlockingQueue<Runnable> DISPATCH_THREAD_POOL_QUEUE;
+ public static final ExecutorService DISPATCH_EXECUTOR;
+ public static final ScheduledExecutorService COMMON_SCHEDULED_EXECUTOR;
+
+ public static final ScheduledExecutorService COMMIT_EXECUTOR;
+
+ public static final ScheduledExecutorService CLEAN_EXPIRED_FILE_EXECUTOR;
+
+ private static final BlockingQueue<Runnable> FETCH_DATA_THREAD_POOL_QUEUE;
+ public static final ExecutorService FETCH_DATA_EXECUTOR;
+
+ private static final BlockingQueue<Runnable> COMPACT_INDEX_FILE_THREAD_POOL_QUEUE;
+ public static final ExecutorService COMPACT_INDEX_FILE_EXECUTOR;
+
+ static {
+ DISPATCH_THREAD_POOL_QUEUE = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+ DISPATCH_EXECUTOR = new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ DISPATCH_THREAD_POOL_QUEUE,
+ new ThreadFactoryImpl("TieredCommonExecutor_"));
+
+ COMMON_SCHEDULED_EXECUTOR = new ScheduledThreadPoolExecutor(
+ Math.max(4, Runtime.getRuntime().availableProcessors()),
+ new ThreadFactoryImpl("TieredCommonScheduledExecutor_"));
+
+ COMMIT_EXECUTOR = new ScheduledThreadPoolExecutor(
+ Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+ new ThreadFactoryImpl("TieredCommitExecutor_"));
+
+ CLEAN_EXPIRED_FILE_EXECUTOR = new ScheduledThreadPoolExecutor(
+ Math.max(4, Runtime.getRuntime().availableProcessors()),
+ new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_"));
+
+ FETCH_DATA_THREAD_POOL_QUEUE = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+ FETCH_DATA_EXECUTOR = new ThreadPoolExecutor(
+ Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+ Math.max(64, Runtime.getRuntime().availableProcessors() * 8),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ FETCH_DATA_THREAD_POOL_QUEUE,
+ new ThreadFactoryImpl("TieredFetchDataExecutor_"));
+
+ COMPACT_INDEX_FILE_THREAD_POOL_QUEUE = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+ COMPACT_INDEX_FILE_EXECUTOR = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ COMPACT_INDEX_FILE_THREAD_POOL_QUEUE,
+ new ThreadFactoryImpl("TieredCompactIndexFileExecutor_"));
+ }
+
+ public static void shutdown() {
+ DISPATCH_EXECUTOR.shutdown();
+ COMMON_SCHEDULED_EXECUTOR.shutdown();
+ COMMIT_EXECUTOR.shutdown();
+ CLEAN_EXPIRED_FILE_EXECUTOR.shutdown();
+ FETCH_DATA_EXECUTOR.shutdown();
+ COMPACT_INDEX_FILE_EXECUTOR.shutdown();
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredCommitLog.java
new file mode 100644
index 000000000..91e26de91
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredCommitLog.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtil;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public class TieredCommitLog {
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ public static final int CODA_SIZE = 4 /* item size: int, 4 bytes */
+ + 4 /* magic code: int, 4 bytes */
+ + 8 /* max store timestamp: long, 8 bytes */;
+ public static final int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
+
+ private final MessageQueue messageQueue;
+ private final TieredMessageStoreConfig storeConfig;
+ private final TieredFileQueue fileQueue;
+
+ public TieredCommitLog(MessageQueue messageQueue, TieredMessageStoreConfig storeConfig)
+ throws ClassNotFoundException, NoSuchMethodException {
+ this.messageQueue = messageQueue;
+ this.storeConfig = storeConfig;
+ this.fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.COMMIT_LOG, messageQueue, storeConfig);
+ if (fileQueue.getBaseOffset() == -1) {
+ fileQueue.setBaseOffset(0);
+ }
+ }
+
+ public long getMinOffset() {
+ return fileQueue.getMinOffset();
+ }
+
+ public long getCommitOffset() {
+ return fileQueue.getCommitOffset();
+ }
+
+ public long getCommitMsgQueueOffset() {
+ return fileQueue.getCommitMsgQueueOffset();
+ }
+
+ public long getMaxOffset() {
+ return fileQueue.getMaxOffset();
+ }
+
+ public long getBeginTimestamp() {
+ TieredFileSegment firstIndexFile = fileQueue.getFileByIndex(0);
+ if (firstIndexFile == null) {
+ return -1;
+ }
+ long beginTimestamp = firstIndexFile.getBeginTimestamp();
+ return beginTimestamp != Long.MAX_VALUE ? beginTimestamp : -1;
+ }
+
+ public long getEndTimestamp() {
+ return fileQueue.getFileToWrite().getEndTimestamp();
+ }
+
+ public AppendResult append(ByteBuffer byteBuf) {
+ return fileQueue.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf));
+ }
+
+ public AppendResult append(ByteBuffer byteBuf, boolean commit) {
+ return fileQueue.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf), commit);
+ }
+
+ public CompletableFuture<ByteBuffer> readAsync(long offset, int length) {
+ return fileQueue.readAsync(offset, length);
+ }
+
+ public void commit(boolean sync) {
+ fileQueue.commit(sync);
+ }
+
+ public void cleanExpiredFile(long expireTimestamp) {
+ fileQueue.cleanExpiredFile(expireTimestamp);
+ }
+
+ public void destroyExpiredFile() {
+ fileQueue.destroyExpiredFile();
+
+ if (fileQueue.getFileSegmentCount() == 0) {
+ return;
+ }
+ TieredFileSegment fileSegment = fileQueue.getFileToWrite();
+ try {
+ if (System.currentTimeMillis() - fileSegment.getEndTimestamp() > (long) storeConfig.getCommitLogRollingInterval() * 60 * 60 * 1000
+ && fileSegment.getSize() > storeConfig.getCommitLogRollingMinimumSize()) {
+ fileQueue.rollingNewFile();
+ }
+ } catch (Exception e) {
+ logger.error("Rolling to next file failed:", e);
+ }
+ }
+
+ public void destroy() {
+ fileQueue.destroy();
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredConsumeQueue.java
new file mode 100644
index 000000000..f4fecdc1a
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredConsumeQueue.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.BoundaryType;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+
+public class TieredConsumeQueue {
+ public static final int CONSUME_QUEUE_STORE_UNIT_SIZE = 8 /* commit log offset: long, 8 bytes */
+ + 4 /* message size: int, 4 bytes */
+ + 8 /* tag hash code: long, 8 bytes */;
+ private final MessageQueue messageQueue;
+ private final TieredMessageStoreConfig storeConfig;
+ private final TieredFileQueue fileQueue;
+
+
+ public TieredConsumeQueue(MessageQueue messageQueue, TieredMessageStoreConfig storeConfig) throws ClassNotFoundException, NoSuchMethodException {
+ this.messageQueue = messageQueue;
+ this.storeConfig = storeConfig;
+ this.fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE, messageQueue, storeConfig);
+ }
+
+ public boolean isInitialized() {
+ return fileQueue.getBaseOffset() != -1;
+ }
+
+ public long getBaseOffset() {
+ return fileQueue.getBaseOffset();
+ }
+
+ public void setBaseOffset(long baseOffset) {
+ fileQueue.setBaseOffset(baseOffset);
+ }
+
+ public long getMinOffset() {
+ return fileQueue.getMinOffset();
+ }
+
+ public long getCommitOffset() {
+ return fileQueue.getCommitOffset();
+ }
+
+ public long getMaxOffset() {
+ return fileQueue.getMaxOffset();
+ }
+
+ public long getEndTimestamp() {
+ return fileQueue.getFileToWrite().getEndTimestamp();
+ }
+
+ public AppendResult append(final long offset, final int size, final long tagsCode, long timeStamp) {
+ return append(offset, size, tagsCode, timeStamp, false);
+ }
+
+ public AppendResult append(final long offset, final int size, final long tagsCode, long timeStamp, boolean commit) {
+ ByteBuffer cqItem = ByteBuffer.allocate(CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqItem.putLong(offset);
+ cqItem.putInt(size);
+ cqItem.putLong(tagsCode);
+ cqItem.flip();
+ return fileQueue.append(cqItem, timeStamp, commit);
+ }
+
+ public CompletableFuture<ByteBuffer> readAsync(long offset, int length) {
+ return fileQueue.readAsync(offset, length);
+ }
+
+ public void commit(boolean sync) {
+ fileQueue.commit(sync);
+ }
+
+ public void cleanExpiredFile(long expireTimestamp) {
+ fileQueue.cleanExpiredFile(expireTimestamp);
+ }
+
+ public void destroyExpiredFile() {
+ fileQueue.destroyExpiredFile();
+ }
+
+ protected Pair<Long, Long> getQueueOffsetInFileByTime(long timestamp, BoundaryType boundaryType) {
+ TieredFileSegment fileSegment = fileQueue.getFileByTime(timestamp, boundaryType);
+ if (fileSegment == null) {
+ return Pair.of(-1L, -1L);
+ }
+ return Pair.of(fileSegment.getBaseOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE,
+ fileSegment.getCommitOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1);
+ }
+
+ public void destroy() {
+ fileQueue.destroy();
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileQueue.java
new file mode 100644
index 000000000..aa7e8e044
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileQueue.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.BoundaryType;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreErrorCode;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreException;
+import org.apache.rocketmq.store.tiered.metadata.FileSegmentMetadata;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public class TieredFileQueue {
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ private final TieredFileSegment.FileSegmentType fileType;
+ private final MessageQueue messageQueue;
+ private long baseOffset = -1;
+ private final TieredMessageStoreConfig storeConfig;
+ private final TieredMetadataStore metadataStore;
+
+ private final List<TieredFileSegment> fileSegmentList = new ArrayList<>();
+ protected final List<TieredFileSegment> needCommitFileSegmentList = new CopyOnWriteArrayList<>();
+ private final ReentrantReadWriteLock fileSegmentLock = new ReentrantReadWriteLock();
+
+ private final Constructor<? extends TieredFileSegment> fileSegmentConstructor;
+
+ public TieredFileQueue(TieredFileSegment.FileSegmentType fileType, MessageQueue messageQueue,
+ TieredMessageStoreConfig storeConfig) throws ClassNotFoundException, NoSuchMethodException {
+ this.fileType = fileType;
+ this.messageQueue = messageQueue;
+ this.storeConfig = storeConfig;
+ this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+ Class<? extends TieredFileSegment> clazz = Class.forName(storeConfig.getTieredBackendServiceProvider()).asSubclass(TieredFileSegment.class);
+ fileSegmentConstructor = clazz.getConstructor(TieredFileSegment.FileSegmentType.class, MessageQueue.class, Long.TYPE, TieredMessageStoreConfig.class);
+ loadFromMetadata();
+ if (fileType != TieredFileSegment.FileSegmentType.INDEX) {
+ checkAndFixFileSize();
+ }
+ }
+
+ public long getBaseOffset() {
+ return baseOffset;
+ }
+
+ public void setBaseOffset(long baseOffset) {
+ if (fileSegmentList.size() > 0) {
+ throw new IllegalStateException("can not set base offset after file segment has been created");
+ }
+ this.baseOffset = baseOffset;
+ }
+
+ public long getMinOffset() {
+ fileSegmentLock.readLock().lock();
+ try {
+ if (fileSegmentList.isEmpty()) {
+ return baseOffset;
+ }
+ return fileSegmentList.get(0).getBaseOffset();
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ public long getCommitOffset() {
+ fileSegmentLock.readLock().lock();
+ try {
+ if (fileSegmentList.isEmpty()) {
+ return baseOffset;
+ }
+ return fileSegmentList.get(fileSegmentList.size() - 1).getCommitOffset();
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ public long getMaxOffset() {
+ fileSegmentLock.readLock().lock();
+ try {
+ if (fileSegmentList.isEmpty()) {
+ return baseOffset;
+ }
+ return fileSegmentList.get(fileSegmentList.size() - 1).getMaxOffset();
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ public long getCommitMsgQueueOffset() {
+ fileSegmentLock.readLock().lock();
+ try {
+ if (fileSegmentList.isEmpty()) {
+ return 0;
+ }
+ return fileSegmentList.get(fileSegmentList.size() - 1).getCommitMsgQueueOffset();
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ private void loadFromMetadata() {
+ metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(), metadata -> {
+ if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) {
+ return;
+ }
+ TieredFileSegment segment = newSegment(metadata.getBaseOffset(), false);
+ segment.initPosition(metadata.getSize());
+ segment.setBeginTimestamp(metadata.getBeginTimestamp());
+ segment.setEndTimestamp(metadata.getEndTimestamp());
+ if (metadata.getStatus() == FileSegmentMetadata.STATUS_SEALED) {
+ segment.setFull(false);
+ }
+ // TODO check coda/size
+ fileSegmentList.add(segment);
+ });
+ if (!fileSegmentList.isEmpty()) {
+ fileSegmentList.sort(Comparator.comparingLong(TieredFileSegment::getBaseOffset));
+ baseOffset = fileSegmentList.get(0).getBaseOffset();
+ needCommitFileSegmentList.addAll(fileSegmentList.stream()
+ .filter(segment -> !segment.isFull())
+ .collect(Collectors.toList()));
+ }
+ }
+
+ private void checkAndFixFileSize() {
+ for (int i = 1; i < fileSegmentList.size(); i++) {
+ TieredFileSegment pre = fileSegmentList.get(i - 1);
+ TieredFileSegment cur = fileSegmentList.get(i);
+ if (pre.getCommitOffset() != cur.getBaseOffset()) {
+ logger.warn("TieredFileQueue#checkAndFixFileSize: file segment has incorrect size: topic: {}, queue: {}, file type: {}, base offset: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, pre.getBaseOffset());
+ try {
+ long actualSize = pre.getSize();
+ if (pre.getBaseOffset() + actualSize != cur.getBaseOffset()) {
+ logger.error("[Bug]TieredFileQueue#checkAndFixFileSize: file segment has incorrect size and can not fix: topic: {}, queue: {}, file type: {}, base offset: {}, actual size: {}, next file offset: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, pre.getBaseOffset(), actualSize, cur.getBaseOffset());
+ continue;
+ }
+ pre.initPosition(actualSize);
+ metadataStore.updateFileSegment(pre);
+ } catch (Exception e) {
+ logger.error("TieredFileQueue#checkAndFixFileSize: fix file segment size failed: topic: {}, queue: {}, file type: {}, base offset: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, pre.getBaseOffset());
+ }
+ }
+ }
+ if (!fileSegmentList.isEmpty()) {
+ TieredFileSegment lastFile = fileSegmentList.get(fileSegmentList.size() - 1);
+ long lastFileSize = lastFile.getSize();
+ if (lastFile.getCommitPosition() != lastFileSize) {
+ logger.warn("TieredFileQueue#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}", lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize);
+ lastFile.initPosition(lastFileSize);
+ }
+ }
+ }
+
+ private TieredFileSegment newSegment(long baseOffset, boolean createMetadata) {
+ TieredFileSegment segment = null;
+ try {
+ segment = fileSegmentConstructor.newInstance(fileType, messageQueue, baseOffset, storeConfig);
+ if (fileType != TieredFileSegment.FileSegmentType.INDEX) {
+ segment.createFile();
+ }
+ if (createMetadata) {
+ metadataStore.updateFileSegment(segment);
+ }
+ } catch (Exception e) {
+ logger.error("create file segment failed: topic: {}, queue: {}, file type: {}, base offset: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, baseOffset, e);
+ }
+ return segment;
+ }
+
+ public void rollingNewFile() {
+ TieredFileSegment segment = getFileToWrite();
+ segment.setFull();
+ // create new segment
+ getFileToWrite();
+ }
+
+ public int getFileSegmentCount() {
+ return fileSegmentList.size();
+ }
+
+ @Nullable
+ protected TieredFileSegment getFileByIndex(int index) {
+ fileSegmentLock.readLock().lock();
+ try {
+ if (index < fileSegmentList.size()) {
+ return fileSegmentList.get(index);
+ }
+ return null;
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ protected TieredFileSegment getFileToWrite() {
+ if (baseOffset == -1) {
+ throw new IllegalStateException("need to set base offset before create file segment");
+ }
+ fileSegmentLock.readLock().lock();
+ try {
+ if (!fileSegmentList.isEmpty()) {
+ TieredFileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() - 1);
+ if (!fileSegment.isFull()) {
+ return fileSegment;
+ }
+ }
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ // Create new file segment
+ fileSegmentLock.writeLock().lock();
+ try {
+ long offset = baseOffset;
+ if (!fileSegmentList.isEmpty()) {
+ TieredFileSegment segment = fileSegmentList.get(fileSegmentList.size() - 1);
+ if (!segment.isFull()) {
+ return segment;
+ }
+ if (segment.commit()) {
+ try {
+ metadataStore.updateFileSegment(segment);
+ } catch (Exception e) {
+ return segment;
+ }
+ } else {
+ return segment;
+ }
+
+ offset = segment.getMaxOffset();
+ }
+ TieredFileSegment fileSegment = newSegment(offset, true);
+ fileSegmentList.add(fileSegment);
+ needCommitFileSegmentList.add(fileSegment);
+
+ Collections.sort(fileSegmentList);
+
+ logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", baseOffset, fileSegment.getPath(), fileType);
+ return fileSegment;
+ } finally {
+ fileSegmentLock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ protected TieredFileSegment getFileByTime(long timestamp, BoundaryType boundaryType) {
+ fileSegmentLock.readLock().lock();
+ try {
+ List<TieredFileSegment> segmentList = fileSegmentList.stream()
+ .sorted(boundaryType == BoundaryType.UPPER ? Comparator.comparingLong(TieredFileSegment::getEndTimestamp) : Comparator.comparingLong(TieredFileSegment::getBeginTimestamp))
+ .filter(segment -> boundaryType == BoundaryType.UPPER ? segment.getEndTimestamp() >= timestamp : segment.getBeginTimestamp() <= timestamp)
+ .collect(Collectors.toList());
+ if (!segmentList.isEmpty()) {
+ return boundaryType == BoundaryType.UPPER ? segmentList.get(0) : segmentList.get(segmentList.size() - 1);
+ }
+ return fileSegmentList.isEmpty() ? null : fileSegmentList.get(fileSegmentList.size() - 1);
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ protected List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
+ fileSegmentLock.readLock().lock();
+ try {
+ return fileSegmentList.stream()
+ .filter(segment -> Math.max(beginTime, segment.getBeginTimestamp()) <= Math.min(endTime, segment.getEndTimestamp()))
+ .collect(Collectors.toList());
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ protected int getSegmentIndexByOffset(long offset) {
+ fileSegmentLock.readLock().lock();
+ try {
+ if (fileSegmentList.size() <= 0) {
+ return -1;
+ }
+
+ int left = 0;
+ int right = fileSegmentList.size() - 1;
+ int mid = (left + right) / 2;
+
+ long firstSegmentOffset = fileSegmentList.get(left).getBaseOffset();
+ long lastSegmentOffset = fileSegmentList.get(right).getCommitOffset();
+ long midSegmentOffset = fileSegmentList.get(mid).getBaseOffset();
+
+ if (offset < firstSegmentOffset || offset > lastSegmentOffset) {
+ return -1;
+ }
+
+ while (left < right - 1) {
+ if (offset == midSegmentOffset) {
+ return mid;
+ }
+ if (offset < midSegmentOffset) {
+ right = mid;
+ } else {
+ left = mid;
+ }
+ mid = (left + right) / 2;
+ midSegmentOffset = fileSegmentList.get(mid).getBaseOffset();
+ }
+ return offset < fileSegmentList.get(right).getBaseOffset() ? mid : right;
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ }
+
+ public AppendResult append(ByteBuffer byteBuf) {
+ return append(byteBuf, Long.MAX_VALUE, false);
+ }
+
+ public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
+ return append(byteBuf, timeStamp, false);
+ }
+
+ public AppendResult append(ByteBuffer byteBuf, long timeStamp, boolean commit) {
+ TieredFileSegment fileSegment = getFileToWrite();
+ AppendResult result = fileSegment.append(byteBuf, timeStamp);
+ if (commit && result == AppendResult.BUFFER_FULL && fileSegment.commit()) {
+ result = fileSegment.append(byteBuf, timeStamp);
+ }
+ if (result == AppendResult.FILE_FULL) {
+ // write to new file
+ return getFileToWrite().append(byteBuf, timeStamp);
+ }
+ return result;
+ }
+
+ public void cleanExpiredFile(long expireTimestamp) {
+ Set<Long> needToDeleteSet = new HashSet<>();
+ try {
+ metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(),
+ metadata -> {
+ if (metadata.getEndTimestamp() < expireTimestamp) {
+ needToDeleteSet.add(metadata.getBaseOffset());
+ }
+ });
+ } catch (Exception e) {
+ logger.error("clean expired failed: topic: {}, queue: {}, file type: {}, expire timestamp: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, expireTimestamp);
+ }
+
+ if (needToDeleteSet.isEmpty()) {
+ return;
+ }
+
+ fileSegmentLock.writeLock().lock();
+ try {
+ for (int i = 0; i < fileSegmentList.size(); i++) {
+ try {
+ TieredFileSegment fileSegment = fileSegmentList.get(i);
+ if (needToDeleteSet.contains(fileSegment.getBaseOffset())) {
+ fileSegment.close();
+ fileSegmentList.remove(fileSegment);
+ needCommitFileSegmentList.remove(fileSegment);
+ i--;
+ metadataStore.updateFileSegment(fileSegment);
+ logger.info("expired file {} is been cleaned", fileSegment.getPath());
+ } else {
+ break;
+ }
+ } catch (Exception e) {
+ logger.error("clean expired file failed: topic: {}, queue: {}, file type: {}, expire timestamp: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, expireTimestamp, e);
+ }
+ }
+ if (fileSegmentList.size() > 0) {
+ baseOffset = fileSegmentList.get(0).getBaseOffset();
+ } else if (fileType == TieredFileSegment.FileSegmentType.CONSUME_QUEUE) {
+ baseOffset = -1;
+ } else {
+ baseOffset = 0;
+ }
+ } finally {
+ fileSegmentLock.writeLock().unlock();
+ }
+ }
+
+ public void destroyExpiredFile() {
+ try {
+ metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(),
+ metadata -> {
+ if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) {
+ try {
+ TieredFileSegment fileSegment = newSegment(metadata.getBaseOffset(), false);
+ fileSegment.destroyFile();
+ if (!fileSegment.exists()) {
+ metadataStore.deleteFileSegment(fileSegment);
+ logger.info("expired file {} is been destroyed", fileSegment.getPath());
+ }
+ } catch (Exception e) {
+ logger.error("destroy expired failed: topic: {}, queue: {}, file type: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ logger.error("destroy expired file failed: topic: {}, queue: {}, file type: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType);
+ }
+ }
+
+ public void commit(boolean sync) {
+ ArrayList<CompletableFuture<Void>> futureList = new ArrayList<>();
+ try {
+ for (TieredFileSegment segment : needCommitFileSegmentList) {
+ if (segment.isClosed()) {
+ continue;
+ }
+ futureList.add(segment.commitAsync()
+ .thenAccept(success -> {
+ try {
+ metadataStore.updateFileSegment(segment);
+ } catch (Exception e) {
+ // TODO handle update segment metadata failed exception
+ logger.error("update file segment metadata failed: topic: {}, queue: {}, file type: {}, base offset: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, segment.getBaseOffset(), e);
+ }
+ if (segment.isFull() && !segment.needCommit()) {
+ needCommitFileSegmentList.remove(segment);
+ }
+ }));
+ }
+ } catch (Exception e) {
+ logger.error("commit file segment failed: topic: {}, queue: {}, file type: {}", messageQueue.getTopic(), messageQueue.getQueueId(), fileType, e);
+ }
+ if (sync) {
+ CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
+ }
+ }
+
+ public CompletableFuture<ByteBuffer> readAsync(long offset, int length) {
+ int index = getSegmentIndexByOffset(offset);
+ if (index == -1) {
+ String errorMsg = String.format("TieredFileQueue#readAsync: offset is illegal, topic: %s, queue: %s, file type: %s, start: %d, length: %d, file num: %d",
+ messageQueue.getTopic(), messageQueue.getQueueId(), fileType, offset, length, fileSegmentList.size());
+ logger.error(errorMsg);
+ throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, errorMsg);
+ }
+ TieredFileSegment fileSegment1;
+ TieredFileSegment fileSegment2 = null;
+ fileSegmentLock.readLock().lock();
+ try {
+ fileSegment1 = fileSegmentList.get(index);
+ if (offset + length > fileSegment1.getCommitOffset()) {
+ if (fileSegmentList.size() > index + 1) {
+ fileSegment2 = fileSegmentList.get(index + 1);
+ }
+ }
+ } finally {
+ fileSegmentLock.readLock().unlock();
+ }
+ if (fileSegment2 == null) {
+ return fileSegment1.readAsync(offset - fileSegment1.getBaseOffset(), length);
+ }
+ int segment1Length = (int) (fileSegment1.getCommitOffset() - offset);
+ return fileSegment1.readAsync(offset - fileSegment1.getBaseOffset(), segment1Length)
+ .thenCombine(fileSegment2.readAsync(0, length - segment1Length), (buffer1, buffer2) -> {
+ ByteBuffer compositeBuffer = ByteBuffer.allocate(buffer1.remaining() + buffer2.remaining());
+ compositeBuffer.put(buffer1).put(buffer2);
+ compositeBuffer.flip();
+ return compositeBuffer;
+ });
+ }
+
+ public void destroy() {
+ fileSegmentLock.writeLock().lock();
+ try {
+ for (TieredFileSegment fileSegment : fileSegmentList) {
+ fileSegment.close();
+ try {
+ metadataStore.updateFileSegment(fileSegment);
+ } catch (Exception e) {
+ logger.error("TieredFileQueue#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e);
+ }
+ fileSegment.destroyFile();
+ }
+ fileSegmentList.clear();
+ needCommitFileSegmentList.clear();
+ } finally {
+ fileSegmentLock.writeLock().unlock();
+ }
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileSegment.java
new file mode 100644
index 000000000..9cf19c638
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileSegment.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import com.google.common.base.Stopwatch;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreErrorCode;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreException;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtil;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public abstract class TieredFileSegment implements Comparable<TieredFileSegment> {
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ private volatile boolean closed = false;
+ private final ReentrantLock bufferLock = new ReentrantLock();
+ private final Semaphore commitLock = new Semaphore(1);
+ private List<ByteBuffer> uploadBufferList = new ArrayList<>();
+ private boolean full;
+ protected final FileSegmentType fileType;
+ protected final MessageQueue messageQueue;
+ protected final TieredMessageStoreConfig storeConfig;
+ protected final long baseOffset;
+ private volatile long commitPosition;
+ private volatile long appendPosition;
+ private final long maxSize;
+ private long beginTimestamp = Long.MAX_VALUE;
+ private long endTimestamp = Long.MAX_VALUE;
+ // only used in commitLog
+ private long commitMsgQueueOffset = 0;
+ private ByteBuffer codaBuffer;
+
+ private CompletableFuture<Boolean> inflightCommitRequest = CompletableFuture.completedFuture(false);
+
+ public TieredFileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset,
+ TieredMessageStoreConfig storeConfig) {
+ this.fileType = fileType;
+ this.messageQueue = messageQueue;
+ this.storeConfig = storeConfig;
+ this.baseOffset = baseOffset;
+ this.commitPosition = 0;
+ this.appendPosition = 0;
+ switch (fileType) {
+ case COMMIT_LOG:
+ this.maxSize = storeConfig.getTieredStoreCommitLogMaxSize();
+ break;
+ case CONSUME_QUEUE:
+ this.maxSize = storeConfig.getTieredStoreConsumeQueueMaxSize();
+ break;
+ case INDEX:
+ this.maxSize = Long.MAX_VALUE;
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported file type: " + fileType);
+ }
+ }
+
+ @Override
+ public int compareTo(TieredFileSegment o) {
+ return Long.compare(this.baseOffset, o.baseOffset);
+ }
+
+ public long getBaseOffset() {
+ return baseOffset;
+ }
+
+ public long getCommitOffset() {
+ return baseOffset + commitPosition;
+ }
+
+ public long getCommitPosition() {
+ return commitPosition;
+ }
+
+ public long getCommitMsgQueueOffset() {
+ return commitMsgQueueOffset;
+ }
+
+ public long getMaxOffset() {
+ return baseOffset + appendPosition;
+ }
+
+ public long getMaxSize() {
+ return maxSize;
+ }
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+ protected void setBeginTimestamp(long beginTimestamp) {
+ this.beginTimestamp = beginTimestamp;
+ }
+
+ public long getEndTimestamp() {
+ return endTimestamp;
+ }
+
+ protected void setEndTimestamp(long endTimestamp) {
+ this.endTimestamp = endTimestamp;
+ }
+
+ public boolean isFull() {
+ return full;
+ }
+
+ public void setFull() {
+ setFull(true);
+ }
+
+ public void setFull(boolean appendCoda) {
+ bufferLock.lock();
+ try {
+ full = true;
+ if (fileType == FileSegmentType.COMMIT_LOG && appendCoda) {
+ appendCoda();
+ }
+ } finally {
+ bufferLock.unlock();
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void close() {
+ closed = true;
+ }
+
+ public FileSegmentType getFileType() {
+ return fileType;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public void initPosition(long pos) {
+ this.commitPosition = pos;
+ this.appendPosition = pos;
+ }
+
+ private List<ByteBuffer> rollingUploadBuffer() {
+ bufferLock.lock();
+ try {
+ List<ByteBuffer> tmp = uploadBufferList;
+ uploadBufferList = new ArrayList<>();
+ return tmp;
+ } finally {
+ bufferLock.unlock();
+ }
+ }
+
+ private void sendBackBuffer(TieredFileSegmentInputStream inputStream) {
+ bufferLock.lock();
+ try {
+ List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList();
+ for (ByteBuffer buffer : tmpBufferList) {
+ buffer.rewind();
+ }
+ tmpBufferList.addAll(uploadBufferList);
+ uploadBufferList = tmpBufferList;
+ if (inputStream.getCodaBuffer() != null) {
+ codaBuffer.rewind();
+ }
+ } finally {
+ bufferLock.unlock();
+ }
+ }
+
+ public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
+ if (closed) {
+ return AppendResult.FILE_CLOSE;
+ }
+ bufferLock.lock();
+ try {
+ if (full || codaBuffer != null) {
+ return AppendResult.FILE_FULL;
+ }
+
+ if (fileType == FileSegmentType.INDEX) {
+ beginTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
+ endTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
+ appendPosition += byteBuf.remaining();
+ uploadBufferList.add(byteBuf);
+ setFull();
+ return AppendResult.SUCCESS;
+ }
+
+ if (appendPosition + byteBuf.remaining() > maxSize) {
+ setFull();
+ return AppendResult.FILE_FULL;
+ }
+ if (uploadBufferList.size() > storeConfig.getTieredStoreGroupCommitCount()
+ || appendPosition - commitPosition > storeConfig.getTieredStoreGroupCommitSize()) {
+ commitAsync();
+ }
+ if (uploadBufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) {
+ logger.debug("TieredFileSegment#append: buffer full: file: {}, upload buffer size: {}",
+ getPath(), uploadBufferList.size());
+ return AppendResult.BUFFER_FULL;
+ }
+ if (timeStamp != Long.MAX_VALUE) {
+ endTimestamp = timeStamp;
+ if (beginTimestamp == Long.MAX_VALUE) {
+ beginTimestamp = timeStamp;
+ }
+ }
+ appendPosition += byteBuf.remaining();
+ uploadBufferList.add(byteBuf);
+ return AppendResult.SUCCESS;
+ } finally {
+ bufferLock.unlock();
+ }
+ }
+
+ private void appendCoda() {
+ if (codaBuffer != null) {
+ return;
+ }
+ codaBuffer = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
+ codaBuffer.putInt(TieredCommitLog.CODA_SIZE);
+ codaBuffer.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
+ codaBuffer.putLong(endTimestamp);
+ codaBuffer.flip();
+ appendPosition += TieredCommitLog.CODA_SIZE;
+ }
+
+ public ByteBuffer read(long position, int length) {
+ return readAsync(position, length).join();
+ }
+
+ public CompletableFuture<ByteBuffer> readAsync(long position, int length) {
+ CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
+ if (position < 0 || length < 0) {
+ future.completeExceptionally(
+ new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "position or length is negative"));
+ return future;
+ }
+ if (length == 0) {
+ future.completeExceptionally(
+ new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "length is zero"));
+ return future;
+ }
+ if (position + length > commitPosition) {
+ logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," +
+ " correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}",
+ getPath(), position, commitPosition, length, commitPosition - position);
+ length = (int) (commitPosition - position);
+ if (length == 0) {
+ future.completeExceptionally(
+ new TieredStoreException(TieredStoreErrorCode.NO_NEW_DATA, "request position is equal to commit position"));
+ return future;
+ }
+ if (fileType == FileSegmentType.CONSUME_QUEUE && length % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
+ future.completeExceptionally(
+ new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "position and length is illegal"));
+ return future;
+ }
+ }
+ return read0(position, length);
+ }
+
+ public boolean needCommit() {
+ return appendPosition > commitPosition;
+ }
+
+ public boolean commit() {
+ if (closed) {
+ return false;
+ }
+ Boolean result = commitAsync().join();
+ if (!result) {
+ result = inflightCommitRequest.join();
+ }
+ return result;
+ }
+
+ public CompletableFuture<Boolean> commitAsync() {
+ if (closed) {
+ return CompletableFuture.completedFuture(false);
+ }
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ if (!needCommit()) {
+ return CompletableFuture.completedFuture(true);
+ }
+ try {
+ int permits = commitLock.drainPermits();
+ if (permits <= 0) {
+ return CompletableFuture.completedFuture(false);
+ }
+ } catch (Exception e) {
+ return CompletableFuture.completedFuture(false);
+ }
+ List<ByteBuffer> bufferList = rollingUploadBuffer();
+ int bufferSize = 0;
+ for (ByteBuffer buffer : bufferList) {
+ bufferSize += buffer.remaining();
+ }
+ if (codaBuffer != null) {
+ bufferSize += codaBuffer.remaining();
+ }
+ if (bufferSize == 0) {
+ return CompletableFuture.completedFuture(true);
+ }
+ TieredFileSegmentInputStream inputStream = new TieredFileSegmentInputStream(fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
+ int finalBufferSize = bufferSize;
+ try {
+ inflightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
+ .thenApply(result -> {
+ if (result) {
+ if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
+ commitMsgQueueOffset = MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
+ }
+ commitPosition += finalBufferSize;
+ return true;
+ }
+ sendBackBuffer(inputStream);
+ return false;
+ }).exceptionally(e -> handleCommitException(inputStream, e))
+ .whenComplete((result, e) -> {
+ if (commitLock.availablePermits() == 0) {
+ logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
+ commitLock.release();
+ } else {
+ logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
+ }
+ });
+ return inflightCommitRequest;
+ } catch (Exception e) {
+ handleCommitException(inputStream, e);
+ if (commitLock.availablePermits() == 0) {
+ logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
+ commitLock.release();
+ } else {
+ logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
+ }
+ }
+ return CompletableFuture.completedFuture(false);
+ }
+
+ private boolean handleCommitException(TieredFileSegmentInputStream inputStream, Throwable e) {
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
+ sendBackBuffer(inputStream);
+ long realSize = 0;
+ if (cause instanceof TieredStoreException && ((TieredStoreException) cause).getPosition() > 0) {
+ realSize = ((TieredStoreException) cause).getPosition();
+ }
+ if (realSize <= 0) {
+ realSize = getSize();
+ }
+ if (realSize > 0 && realSize > commitPosition) {
+ logger.error("TieredFileSegment#handleCommitException: commit failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
+ // TODO check if this diff part is uploaded to backend storage
+ long diff = appendPosition - commitPosition;
+ commitPosition = realSize;
+ appendPosition = realSize + diff;
+ // TODO check if appendPosition is large than maxOffset
+ } else if (realSize < commitPosition) {
+ logger.error("[Bug]TieredFileSegment#handleCommitException: commit failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
+ }
+ return false;
+ }
+
+ public enum FileSegmentType {
+ COMMIT_LOG(0),
+ CONSUME_QUEUE(1),
+ INDEX(2);
+
+ private int type;
+
+ FileSegmentType(int type) {
+ this.type = type;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public static FileSegmentType valueOf(int type) {
+ switch (type) {
+ case 0:
+ return COMMIT_LOG;
+ case 1:
+ return CONSUME_QUEUE;
+ case 2:
+ return INDEX;
+ default:
+ throw new IllegalStateException("Unexpected value: " + type);
+ }
+ }
+ }
+
+ protected static class TieredFileSegmentInputStream extends InputStream {
+
+ private final FileSegmentType fileType;
+ private final List<ByteBuffer> uploadBufferList;
+ private int bufferReadIndex = 0;
+ private int readOffset = 0;
+ // only used in commitLog
+ private long commitLogOffset;
+ private final ByteBuffer commitLogOffsetBuffer = ByteBuffer.allocate(8);
+ private final ByteBuffer codaBuffer;
+ private ByteBuffer curBuffer;
+ private final int contentLength;
+ private int readBytes = 0;
+
+ public TieredFileSegmentInputStream(FileSegmentType fileType, long startOffset,
+ List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
+ this.fileType = fileType;
+ this.commitLogOffset = startOffset;
+ this.commitLogOffsetBuffer.putLong(0, startOffset);
+ this.uploadBufferList = uploadBufferList;
+ this.codaBuffer = codaBuffer;
+ this.contentLength = contentLength;
+ if (uploadBufferList.size() > 0) {
+ this.curBuffer = uploadBufferList.get(0);
+ }
+ if (fileType == FileSegmentType.INDEX && uploadBufferList.size() != 1) {
+ logger.error("[Bug]TieredFileSegmentInputStream: index file must have only one buffer");
+ }
+ }
+
+ public List<ByteBuffer> getUploadBufferList() {
+ return uploadBufferList;
+ }
+
+ public ByteBuffer getCodaBuffer() {
+ return codaBuffer;
+ }
+
+ @Override
+ public int available() {
+ return contentLength - readBytes;
+ }
+
+ @Override
+ public int read() {
+ if (bufferReadIndex >= uploadBufferList.size()) {
+ return readCoda();
+ }
+
+ int res;
+ switch (fileType) {
+ case COMMIT_LOG:
+ if (readOffset >= curBuffer.remaining()) {
+ bufferReadIndex++;
+ if (bufferReadIndex >= uploadBufferList.size()) {
+ return readCoda();
+ }
+ curBuffer = uploadBufferList.get(bufferReadIndex);
+ commitLogOffset += readOffset;
+ commitLogOffsetBuffer.putLong(0, commitLogOffset);
+ readOffset = 0;
+ }
+ if (readOffset >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION && readOffset < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+ res = commitLogOffsetBuffer.get(readOffset - MessageBufferUtil.PHYSICAL_OFFSET_POSITION) & 0xff;
+ readOffset++;
+ } else {
+ res = curBuffer.get(readOffset++) & 0xff;
+ }
+ break;
+ case CONSUME_QUEUE:
+ if (!curBuffer.hasRemaining()) {
+ bufferReadIndex++;
+ if (bufferReadIndex >= uploadBufferList.size()) {
+ return -1;
+ }
+ curBuffer = uploadBufferList.get(bufferReadIndex);
+ }
+ res = curBuffer.get() & 0xff;
+ break;
+ case INDEX:
+ if (!curBuffer.hasRemaining()) {
+ return -1;
+ }
+ res = curBuffer.get() & 0xff;
+ break;
+ default:
+ throw new IllegalStateException("unknown file type");
+ }
+ readBytes++;
+ return res;
+ }
+
+ private int readCoda() {
+ if (fileType != FileSegmentType.COMMIT_LOG || codaBuffer == null) {
+ return -1;
+ }
+ if (!codaBuffer.hasRemaining()) {
+ return -1;
+ }
+ readBytes++;
+ return codaBuffer.get() & 0xff;
+ }
+ }
+
+ public abstract String getPath();
+
+ public abstract long getSize();
+
+ protected abstract boolean exists();
+
+ protected abstract void createFile();
+
+ protected abstract void destroyFile();
+
+ protected abstract CompletableFuture<ByteBuffer> read0(long position, int length);
+
+ protected abstract CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position,
+ int length, boolean append);
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredIndexFile.java
new file mode 100644
index 000000000..0f423a4a3
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredIndexFile.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.index.IndexHeader;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.common.TieredStoreExecutor;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public class TieredIndexFile {
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
+ public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
+ private static final int INDEX_FILE_HEADER_SIZE = 28;
+ private static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
+ private static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
+ public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
+
+ public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
+ public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4;
+ public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12;
+ private static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20;
+ private static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24;
+
+ private static final String INDEX_FILE_DIR_NAME = "tiered_index_file";
+ private static final String CUR_INDEX_FILE_NAME = "0000";
+ private static final String PRE_INDEX_FILE_NAME = "1111";
+ private static final String COMPACT_FILE_NAME = "2222";
+
+ private final TieredMessageStoreConfig storeConfig;
+ private final TieredFileQueue fileQueue;
+ private final int maxHashSlotNum;
+ private final int maxIndexNum;
+ private final int fileMaxSize;
+ private final String curFilePath;
+ private final String preFilepath;
+ private MappedFile preMappedFile;
+ private MappedFile curMappedFile;
+
+ private ReentrantLock curFileLock = new ReentrantLock();
+ private Future<Void> inflightCompactFuture = CompletableFuture.completedFuture(null);
+
+ protected TieredIndexFile(TieredMessageStoreConfig storeConfig)
+ throws ClassNotFoundException, NoSuchMethodException, IOException {
+ this.storeConfig = storeConfig;
+ this.fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.INDEX, new MessageQueue(TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0), storeConfig);
+ if (fileQueue.getBaseOffset() == -1) {
+ fileQueue.setBaseOffset(0);
+ }
+ this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+ this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
+
+ this.fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + (this.maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE) + (this.maxIndexNum * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4;
+ this.curFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + CUR_INDEX_FILE_NAME;
+ this.preFilepath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + PRE_INDEX_FILE_NAME;
+ initFile();
+ TieredStoreExecutor.COMMON_SCHEDULED_EXECUTOR.scheduleWithFixedDelay(() -> {
+ try {
+ curFileLock.lock();
+ try {
+ synchronized (TieredIndexFile.class) {
+ MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
+ int indexNum = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
+ long lastIndexTime = mappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
+ if (indexNum > 0 && System.currentTimeMillis() - lastIndexTime > storeConfig.getTieredStoreIndexFileRollingIdleInterval()) {
+ mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
+ rollingFile();
+ }
+ if (inflightCompactFuture.isDone() && preMappedFile != null && preMappedFile.isAvailable()) {
+ inflightCompactFuture = TieredStoreExecutor.COMPACT_INDEX_FILE_EXECUTOR.submit(new CompactTask(storeConfig, preMappedFile, fileQueue), null);
+ }
+ }
+ } finally {
+ curFileLock.unlock();
+ }
+ } catch (Throwable throwable) {
+ logger.error("TieredIndexFile: submit compact index file task failed:", throwable);
+ }
+ }, 10, 10, TimeUnit.SECONDS);
+ }
+
+ private static boolean isFileSealed(MappedFile mappedFile) {
+ return mappedFile.getMappedByteBuffer().getInt(mappedFile.getFileSize() - 4) == INDEX_FILE_END_MAGIC_CODE;
+ }
+
+ private void initIndexFileHeader(MappedFile mappedFile) {
+ MappedByteBuffer mappedByteBuffer = mappedFile.getMappedByteBuffer();
+ if (mappedByteBuffer.getInt(0) != INDEX_FILE_BEGIN_MAGIC_CODE) {
+ mappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
+ mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, -1L);
+ mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, -1L);
+ mappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, 0);
+ mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 0);
+ for (int i = 0; i < maxHashSlotNum; i++) {
+ mappedByteBuffer.putInt(INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE, -1);
+ }
+ mappedByteBuffer.putInt(fileMaxSize - 4, -1);
+ }
+ }
+
+ private void initFile() throws IOException {
+ curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+ initIndexFileHeader(curMappedFile);
+ File preFile = new File(preFilepath);
+ boolean preFileExists = preFile.exists();
+ if (preFileExists) {
+ preMappedFile = new DefaultMappedFile(preFilepath, fileMaxSize);
+ }
+
+ if (isFileSealed(curMappedFile)) {
+ if (preFileExists) {
+ preFile.delete();
+ }
+ boolean rename = curMappedFile.renameTo(preFilepath);
+ if (rename) {
+ preMappedFile = curMappedFile;
+ curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+ preFileExists = true;
+ }
+ }
+ if (preFileExists) {
+ synchronized (TieredIndexFile.class) {
+ if (inflightCompactFuture.isDone()) {
+ inflightCompactFuture = TieredStoreExecutor.COMPACT_INDEX_FILE_EXECUTOR.submit(new CompactTask(storeConfig, preMappedFile, fileQueue), null);
+ }
+ }
+ }
+ }
+
+ public AppendResult append(MessageQueue mq, int topicId, String key, long offset, int size, long timeStamp) {
+ return putKey(mq, topicId, indexKeyHashMethod(buildKey(mq.getTopic(), key)), offset, size, timeStamp);
+ }
+
+ private boolean rollingFile() throws IOException {
+ File preFile = new File(preFilepath);
+ boolean preFileExists = preFile.exists();
+ if (!preFileExists) {
+ boolean rename = curMappedFile.renameTo(preFilepath);
+ if (rename) {
+ preMappedFile = curMappedFile;
+ curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+ initIndexFileHeader(curMappedFile);
+ tryToCompactPreFile();
+ return true;
+ } else {
+ logger.error("TieredIndexFile#rollingFile: rename current file failed");
+ return false;
+ }
+ }
+ tryToCompactPreFile();
+ return false;
+ }
+
+ private void tryToCompactPreFile() throws IOException {
+ synchronized (TieredIndexFile.class) {
+ if (inflightCompactFuture.isDone()) {
+ inflightCompactFuture = TieredStoreExecutor.COMPACT_INDEX_FILE_EXECUTOR.submit(new CompactTask(storeConfig, preMappedFile, fileQueue), null);
+ }
+ }
+ }
+
+ private AppendResult putKey(MessageQueue mq, int topicId, int hashCode, long offset, int size, long timeStamp) {
+ curFileLock.lock();
+ try {
+ if (isFileSealed(curMappedFile) && !rollingFile()) {
+ return AppendResult.FILE_FULL;
+ }
+
+ MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
+
+ int slotPosition = hashCode % maxHashSlotNum;
+ int slotOffset = INDEX_FILE_HEADER_SIZE + slotPosition * INDEX_FILE_HASH_SLOT_SIZE;
+
+ int slotValue = mappedByteBuffer.getInt(slotOffset);
+
+ long beginTimeStamp = mappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
+ if (beginTimeStamp == -1) {
+ mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, timeStamp);
+ beginTimeStamp = timeStamp;
+ }
+
+ int indexCount = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
+ int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+ + indexCount * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
+
+ int timeDiff = (int) (timeStamp - beginTimeStamp);
+
+ // put hash index
+ mappedByteBuffer.putInt(indexOffset, hashCode);
+ mappedByteBuffer.putInt(indexOffset + 4, topicId);
+ mappedByteBuffer.putInt(indexOffset + 4 + 4, mq.getQueueId());
+ mappedByteBuffer.putLong(indexOffset + 4 + 4 + 4, offset);
+ mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8, size);
+ mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4, timeDiff);
+ mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4, slotValue);
+
+ // put hash slot
+ mappedByteBuffer.putInt(slotOffset, indexCount);
+
+ // put header
+ indexCount += 1;
+ mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, indexCount);
+ mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, timeStamp);
+ if (indexCount == maxIndexNum) {
+ mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
+ rollingFile();
+ }
+ return AppendResult.SUCCESS;
+ } catch (Exception e) {
+ logger.error("TieredIndexFile#putKey: put key failed:", e);
+ return AppendResult.IO_ERROR;
+ } finally {
+ curFileLock.unlock();
+ }
+ }
+
+ public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String topic, String key, long beginTime, long endTime) {
+ int hashCode = indexKeyHashMethod(buildKey(topic, key));
+ int slotPosition = hashCode % maxHashSlotNum;
+ List<TieredFileSegment> fileSegmentList = fileQueue.getFileListByTime(beginTime, endTime);
+ CompletableFuture<List<Pair<Long, ByteBuffer>>> future = null;
+ for (int i = fileSegmentList.size() - 1; i >= 0; i--) {
+ TieredFileSegment fileSegment = fileSegmentList.get(i);
+ CompletableFuture<ByteBuffer> tmpFuture = fileSegment.readAsync(INDEX_FILE_HEADER_SIZE + slotPosition * INDEX_FILE_HASH_SLOT_SIZE, INDEX_FILE_HASH_SLOT_SIZE)
+ .thenCompose(slotBuffer -> {
+ int indexPosition = slotBuffer.getInt();
+ if (indexPosition == -1) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ int indexSize = slotBuffer.getInt();
+ if (indexSize <= 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return fileSegment.readAsync(indexPosition, indexSize);
+ });
+ if (future == null) {
+ future = tmpFuture.thenApply(indexBuffer -> {
+ List<Pair<Long, ByteBuffer>> result = new ArrayList<>();
+ if (indexBuffer != null) {
+ result.add(Pair.of(fileSegment.getBeginTimestamp(), indexBuffer));
+ }
+ return result;
+ });
+ } else {
+ future = future.thenCombine(tmpFuture, (indexList, indexBuffer) -> {
+ if (indexBuffer != null) {
+ indexList.add(Pair.of(fileSegment.getBeginTimestamp(), indexBuffer));
+ }
+ return indexList;
+ });
+ }
+ }
+ return future == null ? CompletableFuture.completedFuture(new ArrayList<>()) : future;
+ }
+
+ public static String buildKey(String topic, String key) {
+ return topic + "#" + key;
+ }
+
+ public static int indexKeyHashMethod(String key) {
+ int keyHash = key.hashCode();
+ int keyHashPositive = Math.abs(keyHash);
+ if (keyHashPositive < 0)
+ keyHashPositive = 0;
+ return keyHashPositive;
+ }
+
+ public void commit(boolean sync) {
+ fileQueue.commit(sync);
+ if (sync) {
+ try {
+ inflightCompactFuture.get();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ public void cleanExpiredFile(long expireTimestamp) {
+ fileQueue.cleanExpiredFile(expireTimestamp);
+ }
+
+ public void destroyExpiredFile() {
+ fileQueue.destroyExpiredFile();
+ }
+
+ public void destroy() {
+ inflightCompactFuture.cancel(true);
+ if (preMappedFile != null) {
+ preMappedFile.destroy(-1);
+ }
+ if (curMappedFile != null) {
+ curMappedFile.destroy(-1);
+ }
+ String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
+ File compactFile = new File(compactFilePath);
+ if (compactFile.exists()) {
+ compactFile.delete();
+ }
+ fileQueue.destroy();
+ }
+
+ static class CompactTask implements Runnable {
+ private final TieredMessageStoreConfig storeConfig;
+
+ private final int maxHashSlotNum;
+ private final int maxIndexNum;
+ private final int fileMaxSize;
+ private MappedFile originFile;
+ private TieredFileQueue fileQueue;
+ private final MappedFile compactFile;
+
+ public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile,
+ TieredFileQueue fileQueue) throws IOException {
+ this.storeConfig = storeConfig;
+ this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+ this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
+ this.originFile = originFile;
+ this.fileQueue = fileQueue;
+ String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
+ fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + (storeConfig.getTieredStoreIndexFileMaxHashSlotNum() * INDEX_FILE_HASH_SLOT_SIZE) + (storeConfig.getTieredStoreIndexFileMaxIndexNum() * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4;
+ // TODO check magic code, upload immediately when compact complete
+ File compactFile = new File(compactFilePath);
+ if (compactFile.exists()) {
+ compactFile.delete();
+ }
+ this.compactFile = new DefaultMappedFile(compactFilePath, fileMaxSize);
+ }
+
+ @Override
+ public void run() {
+ try {
+ compact();
+ } catch (Throwable throwable) {
+ logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable);
+ }
+ }
+
+ public void compact() {
+ if (!isFileSealed(originFile)) {
+ logger.error("[Bug]TieredIndexFile#CompactTask#compact: try to compact unsealed file");
+ originFile.destroy(-1);
+ compactFile.destroy(-1);
+ return;
+ }
+
+ buildCompactFile();
+ fileQueue.append(compactFile.getMappedByteBuffer());
+ fileQueue.commit(true);
+ compactFile.destroy(-1);
+ originFile.destroy(-1);
+ }
+
+ private void buildCompactFile() {
+ MappedByteBuffer originMappedByteBuffer = originFile.getMappedByteBuffer();
+ MappedByteBuffer compactMappedByteBuffer = compactFile.getMappedByteBuffer();
+ compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
+ compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION));
+ compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION));
+ compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, maxHashSlotNum);
+ compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, originMappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION));
+
+ int rePutSlotValue = INDEX_FILE_HEADER_SIZE + (maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE);
+ for (int i = 0; i < maxHashSlotNum; i++) {
+ int slotOffset = INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE;
+ int slotValue = originMappedByteBuffer.getInt(slotOffset);
+ if (slotValue != -1) {
+ int indexTotalSize = 0;
+ int indexPosition = slotValue;
+ while (indexPosition >= 0 && indexPosition < maxIndexNum) {
+ int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+ + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
+ int rePutIndexOffset = rePutSlotValue + indexTotalSize;
+
+ compactMappedByteBuffer.putInt(rePutIndexOffset, originMappedByteBuffer.getInt(indexOffset));
+ compactMappedByteBuffer.putInt(rePutIndexOffset + 4, originMappedByteBuffer.getInt(indexOffset + 4));
+ compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4));
+ compactMappedByteBuffer.putLong(rePutIndexOffset + 4 + 4 + 4, originMappedByteBuffer.getLong(indexOffset + 4 + 4 + 4));
+ compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8));
+ compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4));
+
+ indexTotalSize += INDEX_FILE_HASH_COMPACT_INDEX_SIZE;
+ indexPosition = originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4);
+ }
+ compactMappedByteBuffer.putInt(slotOffset, rePutSlotValue);
+ compactMappedByteBuffer.putInt(slotOffset + 4, indexTotalSize);
+ rePutSlotValue += indexTotalSize;
+ }
+ }
+ compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_END_MAGIC_CODE);
+ compactMappedByteBuffer.putInt(rePutSlotValue, INDEX_FILE_BEGIN_MAGIC_CODE);
+ compactMappedByteBuffer.limit(rePutSlotValue + 4);
+ }
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreErrorCode.java
similarity index 66%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreErrorCode.java
index c85317177..691d869fa 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreErrorCode.java
@@ -14,18 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.tiered.common;
+package org.apache.rocketmq.store.tiered.exception;
-import java.io.File;
-
-public class TieredMessageStoreConfig {
- private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
-
- public String getStorePathRootDir() {
- return storePathRootDir;
- }
-
- public void setStorePathRootDir(String storePathRootDir) {
- this.storePathRootDir = storePathRootDir;
- }
+public enum TieredStoreErrorCode {
+ ILLEGAL_OFFSET,
+ ILLEGAL_PARAM,
+ DOWNLOAD_LENGTH_NOT_CORRECT,
+ NO_NEW_DATA,
+ STORAGE_PROVIDER_ERROR,
+ IO_ERROR,
+ UNKNOWN
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreException.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreException.java
new file mode 100644
index 000000000..f29840228
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreException.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.exception;
+
+public class TieredStoreException extends RuntimeException {
+ private TieredStoreErrorCode errorCode;
+ private int position = -1;
+
+ private String requestId;
+
+ public TieredStoreException(TieredStoreErrorCode errorCode, String errorMessage) {
+ super(errorMessage);
+ this.errorCode = errorCode;
+ }
+
+ public TieredStoreException(TieredStoreErrorCode errorCode, String errorMessage, String requestId) {
+ super(errorMessage);
+ this.errorCode = errorCode;
+ this.requestId = requestId;
+ }
+
+ public TieredStoreErrorCode getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(TieredStoreErrorCode errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ public void setPosition(int position) {
+ this.position = position;
+ }
+
+ @Override
+ public String toString() {
+ String errStr = super.toString();
+ if (requestId != null) {
+ errStr += " requestId: " + requestId;
+ }
+ if (position != -1) {
+ errStr += ", position: " + position;
+ }
+ return errStr;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataManager.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataManager.java
new file mode 100644
index 000000000..da9bb5ce2
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataManager.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+
+public class TieredMetadataManager extends ConfigManager implements TieredMetadataStore {
+ private final AtomicInteger maxTopicId = new AtomicInteger(0);
+ private final ConcurrentMap<String /*topic*/, TopicMetadata> topicMetadataTable = new ConcurrentHashMap<>(1024);
+ private final ConcurrentMap<String /*topic*/, ConcurrentMap<Integer /*queueId*/, QueueMetadata>> queueMetadataTable = new ConcurrentHashMap<>(1024);
+ private final ConcurrentMap<MessageQueue, ConcurrentMap<Long /*baseOffset*/, FileSegmentMetadata>> commitLogFileSegmentTable = new ConcurrentHashMap<>(1024);
+ private final ConcurrentMap<MessageQueue, ConcurrentMap<Long /*baseOffset*/, FileSegmentMetadata>> consumeQueueFileSegmentTable = new ConcurrentHashMap<>(1024);
+ private final ConcurrentMap<MessageQueue, ConcurrentMap<Long /*baseOffset*/, FileSegmentMetadata>> indexFileSegmentTable = new ConcurrentHashMap<>(1024);
+ private final TieredMessageStoreConfig storeConfig;
+
+ public TieredMetadataManager(TieredMessageStoreConfig storeConfig) {
+ this.storeConfig = storeConfig;
+ }
+
+ @Override
+ public String encode() {
+ return encode(false);
+ }
+
+ @Override
+ public String encode(boolean prettyFormat) {
+ TieredMetadataSerializeWrapper dataWrapper = new TieredMetadataSerializeWrapper();
+ dataWrapper.setMaxTopicId(maxTopicId);
+ dataWrapper.setTopicMetadataTable(topicMetadataTable);
+ dataWrapper.setQueueMetadataTable(new HashMap<>(queueMetadataTable));
+ return dataWrapper.toJson(false);
+ }
+
+ @Override
+ public String configFilePath() {
+ return storeConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "tieredStoreMetadata.json";
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ TieredMetadataSerializeWrapper dataWrapper =
+ TieredMetadataSerializeWrapper.fromJson(jsonString, TieredMetadataSerializeWrapper.class);
+ if (dataWrapper != null) {
+ maxTopicId.set(dataWrapper.getMaxTopicId().get());
+ topicMetadataTable.putAll(dataWrapper.getTopicMetadataTable());
+ dataWrapper.getQueueMetadataTable()
+ .forEach((topic, map) -> queueMetadataTable.put(topic, new ConcurrentHashMap<>(map)));
+ }
+ }
+ }
+
+ @Override
+ public void setMaxTopicId(int maxTopicId) {
+ this.maxTopicId.set(maxTopicId);
+ }
+
+ @Nullable
+ @Override
+ public TopicMetadata getTopic(String topic) {
+ return topicMetadataTable.get(topic);
+ }
+
+ @Override
+ public void iterateTopic(Consumer<TopicMetadata> callback) {
+ topicMetadataTable.values().forEach(callback);
+ }
+
+ @Override
+ public TopicMetadata addTopic(String topic, long reserveTime) {
+ TopicMetadata old = getTopic(topic);
+ if (old != null) {
+ return old;
+ }
+ TopicMetadata metadata = new TopicMetadata(maxTopicId.getAndIncrement(), topic, reserveTime);
+ topicMetadataTable.put(topic, metadata);
+ return metadata;
+ }
+
+ @Override
+ public void updateTopicReserveTime(String topic, long reserveTime) {
+ TopicMetadata metadata = getTopic(topic);
+ if (metadata == null) {
+ return;
+ }
+ metadata.setReserveTime(reserveTime);
+ metadata.setUpdateTimestamp(System.currentTimeMillis());
+ }
+
+ @Override
+ public void updateTopicStatus(String topic, int status) {
+ TopicMetadata metadata = getTopic(topic);
+ if (metadata == null) {
+ return;
+ }
+ metadata.setStatus(status);
+ metadata.setUpdateTimestamp(System.currentTimeMillis());
+ }
+
+ @Override
+ public void deleteTopic(String topic) {
+ topicMetadataTable.remove(topic);
+ }
+
+ @Nullable
+ @Override
+ public QueueMetadata getQueue(MessageQueue queue) {
+ if (!queueMetadataTable.containsKey(queue.getTopic())) {
+ return null;
+ }
+ return queueMetadataTable.get(queue.getTopic())
+ .get(queue.getQueueId());
+ }
+
+ @Override
+ public void iterateQueue(String topic, Consumer<QueueMetadata> callback) {
+ queueMetadataTable.get(topic)
+ .values()
+ .forEach(callback);
+ }
+
+ @Override
+ public QueueMetadata addQueue(MessageQueue queue, long baseOffset) {
+ QueueMetadata old = getQueue(queue);
+ if (old != null) {
+ return old;
+ }
+ QueueMetadata metadata = new QueueMetadata(queue, baseOffset, baseOffset);
+ queueMetadataTable.computeIfAbsent(queue.getTopic(), topic -> new ConcurrentHashMap<>())
+ .put(queue.getQueueId(), metadata);
+ return metadata;
+ }
+
+ @Override
+ public void updateQueue(QueueMetadata metadata) {
+ MessageQueue queue = metadata.getQueue();
+ if (queueMetadataTable.containsKey(queue.getTopic())) {
+ ConcurrentMap<Integer, QueueMetadata> metadataMap = queueMetadataTable.get(queue.getTopic());
+ if (metadataMap.containsKey(queue.getQueueId())) {
+ metadata.setUpdateTimestamp(System.currentTimeMillis());
+ metadataMap.put(queue.getQueueId(), metadata);
+ }
+ }
+ }
+
+ @Override
+ public void deleteQueue(MessageQueue queue) {
+ if (queueMetadataTable.containsKey(queue.getTopic())) {
+ queueMetadataTable.get(queue.getTopic())
+ .remove(queue.getQueueId());
+ }
+ }
+
+ @Nullable
+ @Override
+ public FileSegmentMetadata getFileSegment(TieredFileSegment fileSegment) {
+ switch (fileSegment.getFileType()) {
+ case COMMIT_LOG:
+ if (commitLogFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+ return commitLogFileSegmentTable.get(fileSegment.getMessageQueue())
+ .get(fileSegment.getBaseOffset());
+ }
+ break;
+ case CONSUME_QUEUE:
+ if (consumeQueueFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+ return consumeQueueFileSegmentTable.get(fileSegment.getMessageQueue())
+ .get(fileSegment.getBaseOffset());
+ }
+ break;
+ case INDEX:
+ if (indexFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+ return indexFileSegmentTable.get(fileSegment.getMessageQueue())
+ .get(fileSegment.getBaseOffset());
+ }
+ break;
+ }
+ return null;
+ }
+
+ @Override
+ public void iterateFileSegment(Consumer<FileSegmentMetadata> callback) {
+ commitLogFileSegmentTable.forEach((mq, map) -> map.forEach((offset, metadata) -> callback.accept(metadata)));
+ consumeQueueFileSegmentTable.forEach((mq, map) -> map.forEach((offset, metadata) -> callback.accept(metadata)));
+ indexFileSegmentTable.forEach((mq, map) -> map.forEach((offset, metadata) -> callback.accept(metadata)));
+ }
+
+ @Override
+ public void iterateFileSegment(TieredFileSegment.FileSegmentType type, String topic, int queueId,
+ Consumer<FileSegmentMetadata> callback) {
+ MessageQueue messageQueue = new MessageQueue(topic, storeConfig.getBrokerName(), queueId);
+ switch (type) {
+ case COMMIT_LOG:
+ if (commitLogFileSegmentTable.containsKey(messageQueue)) {
+ commitLogFileSegmentTable.get(messageQueue)
+ .forEach((offset, metadata) -> callback.accept(metadata));
+ }
+ break;
+ case CONSUME_QUEUE:
+ if (consumeQueueFileSegmentTable.containsKey(messageQueue)) {
+ consumeQueueFileSegmentTable.get(messageQueue)
+ .forEach((offset, metadata) -> callback.accept(metadata));
+ }
+ break;
+ case INDEX:
+ if (indexFileSegmentTable.containsKey(messageQueue)) {
+ indexFileSegmentTable.get(messageQueue)
+ .forEach((offset, metadata) -> callback.accept(metadata));
+ }
+ break;
+ }
+ }
+
+ @Override
+ public FileSegmentMetadata updateFileSegment(TieredFileSegment fileSegment) {
+ FileSegmentMetadata old = getFileSegment(fileSegment);
+
+ if (old == null) {
+ FileSegmentMetadata metadata = new FileSegmentMetadata(fileSegment.getMessageQueue(),
+ fileSegment.getFileType().getType(),
+ fileSegment.getBaseOffset(),
+ fileSegment.getPath());
+ if (fileSegment.isClosed()) {
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ }
+ metadata.setBeginTimestamp(fileSegment.getBeginTimestamp());
+ metadata.setEndTimestamp(fileSegment.getEndTimestamp());
+ switch (fileSegment.getFileType()) {
+ case COMMIT_LOG:
+ commitLogFileSegmentTable.computeIfAbsent(fileSegment.getMessageQueue(), mq -> new ConcurrentHashMap<>())
+ .put(fileSegment.getBaseOffset(), metadata);
+ break;
+ case CONSUME_QUEUE:
+ consumeQueueFileSegmentTable.computeIfAbsent(fileSegment.getMessageQueue(), mq -> new ConcurrentHashMap<>())
+ .put(fileSegment.getBaseOffset(), metadata);
+ break;
+ case INDEX:
+ indexFileSegmentTable.computeIfAbsent(fileSegment.getMessageQueue(), mq -> new ConcurrentHashMap<>())
+ .put(fileSegment.getBaseOffset(), metadata);
+ break;
+ }
+ return metadata;
+ }
+
+ if (old.getStatus() == FileSegmentMetadata.STATUS_NEW && fileSegment.isFull() && !fileSegment.needCommit()) {
+ old.setStatus(FileSegmentMetadata.STATUS_SEALED);
+ old.setSealTimestamp(System.currentTimeMillis());
+ }
+ if (fileSegment.isClosed()) {
+ old.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ }
+ old.setSize(fileSegment.getCommitPosition());
+ old.setBeginTimestamp(fileSegment.getBeginTimestamp());
+ old.setEndTimestamp(fileSegment.getEndTimestamp());
+ return old;
+ }
+
+ @Override
+ public void deleteFileSegment(MessageQueue mq) {
+ commitLogFileSegmentTable.remove(mq);
+ consumeQueueFileSegmentTable.remove(mq);
+ indexFileSegmentTable.remove(mq);
+ }
+
+ @Override
+ public void deleteFileSegment(TieredFileSegment fileSegment) {
+ switch (fileSegment.getFileType()) {
+ case COMMIT_LOG:
+ if (commitLogFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+ commitLogFileSegmentTable.get(fileSegment.getMessageQueue())
+ .remove(fileSegment.getBaseOffset());
+ }
+ break;
+ case CONSUME_QUEUE:
+ if (consumeQueueFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+ consumeQueueFileSegmentTable.get(fileSegment.getMessageQueue())
+ .remove(fileSegment.getBaseOffset());
+ }
+ break;
+ case INDEX:
+ if (indexFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+ indexFileSegmentTable.get(fileSegment.getMessageQueue())
+ .remove(fileSegment.getBaseOffset());
+ }
+ break;
+ }
+ }
+
+ @Override
+ public void destroy() {
+ maxTopicId.set(0);
+ topicMetadataTable.clear();
+ queueMetadataTable.clear();
+ commitLogFileSegmentTable.clear();
+ consumeQueueFileSegmentTable.clear();
+ indexFileSegmentTable.clear();
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataSerializeWrapper.java
similarity index 96%
rename from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
rename to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataSerializeWrapper.java
index e4e068aff..82f969e24 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataSerializeWrapper.java
@@ -20,7 +20,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-public class TieredStoreMetadataSerializeWrapper extends RemotingSerializable {
+public class TieredMetadataSerializeWrapper extends RemotingSerializable {
private AtomicInteger maxTopicId;
private Map<String /*topic*/, TopicMetadata> topicMetadataTable;
private Map<String /*topic*/, Map<Integer /*queueId*/, QueueMetadata>> queueMetadataTable;
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataStore.java
similarity index 59%
rename from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
rename to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataStore.java
index 7701258af..be746c7ea 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataStore.java
@@ -19,20 +19,66 @@ package org.apache.rocketmq.store.tiered.metadata;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+
+public interface TieredMetadataStore {
+ /**
+ * Topic metadata operation
+ *
+ * @see org.apache.rocketmq.store.tiered.metadata.TopicMetadata
+ */
+ void setMaxTopicId(int maxTopicId);
-public interface TieredStoreMetadataStore {
@Nullable
TopicMetadata getTopic(String topic);
+
void iterateTopic(Consumer<TopicMetadata> callback);
+
TopicMetadata addTopic(String topic, long reserveTime);
+
void updateTopicReserveTime(String topic, long reserveTime);
+
void updateTopicStatus(String topic, int status);
+
void deleteTopic(String topic);
+ /**
+ * Queue metadata operation
+ *
+ * @see org.apache.rocketmq.store.tiered.metadata.QueueMetadata
+ */
@Nullable
QueueMetadata getQueue(MessageQueue queue);
+
void iterateQueue(String topic, Consumer<QueueMetadata> callback);
+
QueueMetadata addQueue(MessageQueue queue, long baseOffset);
+
void updateQueue(QueueMetadata metadata);
+
void deleteQueue(MessageQueue queue);
+
+ /**
+ * File segment metadata operation
+ *
+ * @see org.apache.rocketmq.store.tiered.metadata.FileSegmentMetadata
+ */
+ @Nullable
+ FileSegmentMetadata getFileSegment(TieredFileSegment fileSegment);
+
+ void iterateFileSegment(Consumer<FileSegmentMetadata> callback);
+
+ void iterateFileSegment(TieredFileSegment.FileSegmentType type, String topic, int queueId,
+ Consumer<FileSegmentMetadata> callback);
+
+ FileSegmentMetadata updateFileSegment(TieredFileSegment fileSegment);
+
+ void deleteFileSegment(MessageQueue mq);
+
+ void deleteFileSegment(TieredFileSegment fileSegment);
+
+ /**
+ * Clean all metadata
+ */
+ void destroy();
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
deleted file mode 100644
index e4f241af1..000000000
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.tiered.metadata;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import javax.annotation.Nullable;
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
-
-public class TieredStoreMetadataManager extends ConfigManager implements TieredStoreMetadataStore {
- private final AtomicInteger maxTopicId = new AtomicInteger(0);
- private final ConcurrentMap<String /*topic*/, TopicMetadata> topicMetadataTable = new ConcurrentHashMap<>(1024);
- private final ConcurrentMap<String /*topic*/, ConcurrentMap<Integer /*queueId*/, QueueMetadata>> queueMetadataTable = new ConcurrentHashMap<>(1024);
- private final TieredMessageStoreConfig storeConfig;
-
- public TieredStoreMetadataManager(TieredMessageStoreConfig storeConfig) {
- this.storeConfig = storeConfig;
- }
- @Override
- public String encode() {
- return encode(false);
- }
-
- @Override
- public String encode(boolean prettyFormat) {
- TieredStoreMetadataSerializeWrapper dataWrapper = new TieredStoreMetadataSerializeWrapper();
- dataWrapper.setMaxTopicId(maxTopicId);
- dataWrapper.setTopicMetadataTable(topicMetadataTable);
- dataWrapper.setQueueMetadataTable(new HashMap<>(queueMetadataTable));
- return dataWrapper.toJson(false);
- }
-
- @Override
- public String configFilePath() {
- return storeConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "tieredStoreMetadata.json";
- }
-
- @Override
- public void decode(String jsonString) {
- if (jsonString != null) {
- TieredStoreMetadataSerializeWrapper dataWrapper =
- TieredStoreMetadataSerializeWrapper.fromJson(jsonString, TieredStoreMetadataSerializeWrapper.class);
- if (dataWrapper != null) {
- maxTopicId.set(dataWrapper.getMaxTopicId().get());
- topicMetadataTable.putAll(dataWrapper.getTopicMetadataTable());
- dataWrapper.getQueueMetadataTable()
- .forEach((topic, map) -> queueMetadataTable.put(topic, new ConcurrentHashMap<>(map)));
- }
- }
- }
-
- @Override
- @Nullable
- public TopicMetadata getTopic(String topic) {
- return topicMetadataTable.get(topic);
- }
-
- @Override
- public void iterateTopic(Consumer<TopicMetadata> callback) {
- topicMetadataTable.values().forEach(callback);
- }
-
- @Override
- public TopicMetadata addTopic(String topic, long reserveTime) {
- TopicMetadata old = getTopic(topic);
- if (old != null) {
- return old;
- }
- TopicMetadata metadata = new TopicMetadata(maxTopicId.getAndIncrement(), topic, reserveTime);
- topicMetadataTable.put(topic, metadata);
- return metadata;
- }
-
- @Override
- public void updateTopicReserveTime(String topic, long reserveTime) {
- TopicMetadata metadata = getTopic(topic);
- if (metadata == null) {
- return;
- }
- metadata.setReserveTime(reserveTime);
- metadata.setUpdateTimestamp(System.currentTimeMillis());
- }
-
- @Override
- public void updateTopicStatus(String topic, int status) {
- TopicMetadata metadata = getTopic(topic);
- if (metadata == null) {
- return;
- }
- metadata.setStatus(status);
- metadata.setUpdateTimestamp(System.currentTimeMillis());
- }
-
- @Override
- public void deleteTopic(String topic) {
- topicMetadataTable.remove(topic);
- }
-
- @Override
- @Nullable
- public QueueMetadata getQueue(MessageQueue queue) {
- if (!queueMetadataTable.containsKey(queue.getTopic())) {
- return null;
- }
- return queueMetadataTable.get(queue.getTopic())
- .get(queue.getQueueId());
- }
-
- @Override
- public void iterateQueue(String topic, Consumer<QueueMetadata> callback) {
- queueMetadataTable.get(topic)
- .values()
- .forEach(callback);
- }
-
- @Override
- public QueueMetadata addQueue(MessageQueue queue, long baseOffset) {
- QueueMetadata old = getQueue(queue);
- if (old != null) {
- return old;
- }
- QueueMetadata metadata = new QueueMetadata(queue, baseOffset, baseOffset);
- queueMetadataTable.computeIfAbsent(queue.getTopic(), topic -> new ConcurrentHashMap<>())
- .put(queue.getQueueId(), metadata);
- return metadata;
- }
-
- @Override
- public void updateQueue(QueueMetadata metadata) {
- MessageQueue queue = metadata.getQueue();
- if (queueMetadataTable.containsKey(queue.getTopic())) {
- ConcurrentMap<Integer, QueueMetadata> metadataMap = queueMetadataTable.get(queue.getTopic());
- if (metadataMap.containsKey(queue.getQueueId())) {
- metadata.setUpdateTimestamp(System.currentTimeMillis());
- metadataMap.put(queue.getQueueId(), metadata);
- }
- }
- }
-
- @Override
- public void deleteQueue(MessageQueue queue) {
- if (queueMetadataTable.containsKey(queue.getTopic())) {
- queueMetadataTable.get(queue.getTopic())
- .remove(queue.getQueueId());
- }
- }
-}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtil.java
similarity index 65%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtil.java
index c85317177..9a17fcf72 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtil.java
@@ -14,18 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.tiered.common;
+package org.apache.rocketmq.store.tiered.util;
-import java.io.File;
+import java.nio.ByteBuffer;
-public class TieredMessageStoreConfig {
- private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+public class CQItemBufferUtil {
+ public static long getCommitLogOffset(ByteBuffer cqItem) {
+ return cqItem.getLong(cqItem.position());
+ }
- public String getStorePathRootDir() {
- return storePathRootDir;
+ public static int getSize(ByteBuffer cqItem) {
+ return cqItem.getInt(cqItem.position() + 8);
}
- public void setStorePathRootDir(String storePathRootDir) {
- this.storePathRootDir = storePathRootDir;
+ public static long getTagCode(ByteBuffer cqItem) {
+ return cqItem.getLong(cqItem.position() + 12);
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtil.java
new file mode 100644
index 000000000..7306a12b3
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.container.TieredCommitLog;
+import org.apache.rocketmq.store.tiered.container.TieredConsumeQueue;
+
+public class MessageBufferUtil {
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ public static final int QUEUE_OFFSET_POSITION = 4 /* total size */
+ + 4 /* magic code */
+ + 4 /* body CRC */
+ + 4 /* queue id */
+ + 4; /* flag */
+
+ public static final int PHYSICAL_OFFSET_POSITION = 4 /* total size */
+ + 4 /* magic code */
+ + 4 /* body CRC */
+ + 4 /* queue id */
+ + 4 /* flag */
+ + 8; /* queue offset */
+
+ public static final int SYS_FLAG_OFFSET_POSITION = 4 /* total size */
+ + 4 /* magic code */
+ + 4 /* body CRC */
+ + 4 /* queue id */
+ + 4 /* flag */
+ + 8 /* queue offset */
+ + 8; /* physical offset */
+
+ public static final int STORE_TIMESTAMP_POSITION = 4 /* total size */
+ + 4 /* magic code */
+ + 4 /* body CRC */
+ + 4 /* queue id */
+ + 4 /* flag */
+ + 8 /* queue offset */
+ + 8 /* physical offset */
+ + 4 /* sys flag */
+ + 8 /* born timestamp */
+ + 8; /* born host */
+
+ public static final int STORE_HOST_POSITION = 4 /* total size */
+ + 4 /* magic code */
+ + 4 /* body CRC */
+ + 4 /* queue id */
+ + 4 /* flag */
+ + 8 /* queue offset */
+ + 8 /* physical offset */
+ + 4 /* sys flag */
+ + 8 /* born timestamp */
+ + 8 /* born host */
+ + 8; /* store timestamp */
+
+ public static int getTotalSize(ByteBuffer message) {
+ return message.getInt(message.position());
+ }
+
+ public static int getMagicCode(ByteBuffer message) {
+ return message.getInt(message.position() + 4);
+ }
+
+ public static long getQueueOffset(ByteBuffer message) {
+ return message.getLong(message.position() + QUEUE_OFFSET_POSITION);
+ }
+
+ public static long getCommitLogOffset(ByteBuffer message) {
+ return message.getLong(message.position() + PHYSICAL_OFFSET_POSITION);
+ }
+
+ public static long getStoreTimeStamp(ByteBuffer message) {
+ return message.getLong(message.position() + STORE_TIMESTAMP_POSITION);
+ }
+
+ public static ByteBuffer getOffsetIdBuffer(ByteBuffer message) {
+ ByteBuffer idBuffer = ByteBuffer.allocate(TieredStoreUtil.MSG_ID_LENGTH);
+ idBuffer.limit(TieredStoreUtil.MSG_ID_LENGTH);
+ idBuffer.putLong(message.getLong(message.position() + STORE_HOST_POSITION));
+ idBuffer.putLong(getCommitLogOffset(message));
+ idBuffer.flip();
+ return idBuffer;
+ }
+
+ public static String getOffsetId(ByteBuffer message) {
+ return UtilAll.bytes2string(getOffsetIdBuffer(message).array());
+ }
+
+ public static Map<String, String> getProperties(ByteBuffer message) {
+ ByteBuffer slice = message.slice();
+ return MessageDecoder.decodeProperties(slice);
+ }
+
+ public static List<Pair<Integer/* offset of msgBuffer */, Integer/* msg size */>> splitMessageBuffer(
+ ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
+ cqBuffer.rewind();
+ msgBuffer.rewind();
+ List<Pair<Integer, Integer>> messageList = new ArrayList<>(cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ if (cqBuffer.remaining() % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
+ logger.warn("MessageBufferUtil#splitMessage: consume queue buffer size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}",
+ cqBuffer.remaining(), TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ return messageList;
+ }
+ try {
+ long startCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
+ for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
+ cqBuffer.position(pos);
+ int diff = (int) (CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset);
+ int size = CQItemBufferUtil.getSize(cqBuffer);
+ if (diff + size > msgBuffer.limit()) {
+ logger.error("MessageBufferUtil#splitMessage: message buffer size is incorrect: record in consume queue: {}, actual: {}", diff + size, msgBuffer.remaining());
+ return messageList;
+ }
+ msgBuffer.position(diff);
+
+ int magicCode = getMagicCode(msgBuffer);
+ if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) {
+ logger.warn("MessageBufferUtil#splitMessage: message decode error: blank magic code, this message may be coda, try to fix offset");
+ diff = diff + TieredCommitLog.CODA_SIZE;
+ msgBuffer.position(diff);
+ magicCode = getMagicCode(msgBuffer);
+ }
+ if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
+ logger.warn("MessageBufferUtil#splitMessage: message decode error: unknown magic code");
+ continue;
+ }
+
+ if (getTotalSize(msgBuffer) != size) {
+ logger.warn("MessageBufferUtil#splitMessage: message size is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer));
+ continue;
+ }
+
+ messageList.add(Pair.of(diff, size));
+ }
+ } catch (Exception e) {
+ logger.error("MessageBufferUtil#splitMessage: split message failed, maybe decode consume queue item failed", e);
+ } finally {
+ cqBuffer.rewind();
+ msgBuffer.rewind();
+ }
+ return messageList;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtil.java
new file mode 100644
index 000000000..17597fb08
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtil.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.lang.reflect.Constructor;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+
+public class TieredStoreUtil {
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ private static final long BYTE = 1L;
+ private static final long KB = BYTE << 10;
+ private static final long MB = KB << 10;
+ private static final long GB = MB << 10;
+ private static final long TB = GB << 10;
+ private static final long PB = TB << 10;
+ private static final long EB = PB << 10;
+
+ public static final String TIERED_STORE_LOGGER_NAME = "RocketmqTieredStore";
+ public static final String RMQ_SYS_TIERED_STORE_INDEX_TOPIC = "rmq_sys_INDEX";
+ public static final String PROXY_HOUSEKEEPING_TOPIC_PREFIX = "rocketmq-proxy-";
+ public final static int MSG_ID_LENGTH = 8 + 8;
+
+ private static final DecimalFormat DEC_FORMAT = new DecimalFormat("#.##");
+
+ private final static List<String> SYSTEM_TOPIC_LIST = new LinkedList<String>() {
+ {
+ add(RMQ_SYS_TIERED_STORE_INDEX_TOPIC);
+ }
+ };
+
+ private final static List<String> SYSTEM_TOPIC_WHITE_LIST = new LinkedList<String>() {
+ {
+ }
+ };
+
+ private volatile static TieredMetadataStore metadataStoreInstance;
+
+ private static String formatSize(long size, long divider, String unitName) {
+ return DEC_FORMAT.format((double) size / divider) + unitName;
+ }
+
+ public static String toHumanReadable(long size) {
+ if (size < 0)
+ return String.valueOf(size);
+ if (size >= EB)
+ return formatSize(size, EB, "EB");
+ if (size >= PB)
+ return formatSize(size, PB, "PB");
+ if (size >= TB)
+ return formatSize(size, TB, "TB");
+ if (size >= GB)
+ return formatSize(size, GB, "GB");
+ if (size >= MB)
+ return formatSize(size, MB, "MB");
+ if (size >= KB)
+ return formatSize(size, KB, "KB");
+ return formatSize(size, BYTE, "Bytes");
+ }
+
+ public static String getHash(String str) {
+ try {
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ md.update(str.getBytes(StandardCharsets.UTF_8));
+ byte[] digest = md.digest();
+ return String.format("%032x", new BigInteger(1, digest)).substring(0, 8);
+ } catch (Exception ignore) {
+ return "";
+ }
+ }
+
+ public static String offset2FileName(final long offset) {
+ final NumberFormat numberFormat = NumberFormat.getInstance();
+
+ numberFormat.setMinimumIntegerDigits(20);
+ numberFormat.setMaximumFractionDigits(0);
+ numberFormat.setGroupingUsed(false);
+
+ try {
+ MessageDigest md = MessageDigest.getInstance("MD5");
+
+ md.update(Long.toString(offset).getBytes(StandardCharsets.UTF_8));
+
+ byte[] digest = md.digest();
+ String hash = String.format("%032x", new BigInteger(1, digest)).substring(0, 8);
+ return hash + numberFormat.format(offset);
+ } catch (Exception ignore) {
+ return numberFormat.format(offset);
+ }
+ }
+
+ public static long fileName2Offset(final String fileName) {
+ return Long.parseLong(fileName.substring(fileName.length() - 20));
+ }
+
+ public static void addSystemTopic(final String topic) {
+ SYSTEM_TOPIC_LIST.add(topic);
+ }
+
+ public static boolean isSystemTopic(final String topic) {
+ if (StringUtils.isBlank(topic)) {
+ return false;
+ }
+
+ if (SYSTEM_TOPIC_WHITE_LIST.contains(topic)) {
+ return false;
+ }
+
+ if (SYSTEM_TOPIC_LIST.contains(topic)) {
+ return true;
+ }
+ return TopicValidator.isSystemTopic(topic) || topic.toLowerCase().startsWith(PROXY_HOUSEKEEPING_TOPIC_PREFIX);
+ }
+
+ public static TieredMetadataStore getMetadataStore(TieredMessageStoreConfig storeConfig) {
+ if (metadataStoreInstance == null) {
+ synchronized (TieredMetadataStore.class) {
+ if (metadataStoreInstance == null) {
+ try {
+ Class<? extends TieredMetadataStore> clazz = Class.forName(storeConfig.getTieredMetadataServiceProvider()).asSubclass(TieredMetadataStore.class);
+ Constructor<? extends TieredMetadataStore> constructor = clazz.getConstructor(TieredMessageStoreConfig.class);
+ metadataStoreInstance = constructor.newInstance(storeConfig);
+ } catch (Exception e) {
+ logger.error("TieredMetadataStore#getInstance: build metadata store failed, provider class: {}", storeConfig.getTieredMetadataServiceProvider(), e);
+ }
+ }
+ }
+ }
+ return metadataStoreInstance;
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileQueueTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileQueueTest.java
new file mode 100644
index 000000000..a6bf09f6f
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileQueueTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TieredFileQueueTest {
+ TieredMessageStoreConfig storeConfig;
+ MessageQueue queue;
+
+ @Before
+ public void setUp() {
+ storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "rmqut");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.store.tiered.mock.MemoryFileSegment");
+ queue = new MessageQueue("TieredFileQueueTest", storeConfig.getBrokerName(), 0);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+ TieredStoreUtil.getMetadataStore(storeConfig).destroy();
+ }
+
+ @Test
+ public void testGetFileSegment() throws ClassNotFoundException, NoSuchMethodException {
+ TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+ queue, storeConfig);
+ fileQueue.setBaseOffset(0);
+ TieredFileSegment segment1 = fileQueue.getFileToWrite();
+ segment1.initPosition(1000);
+ segment1.append(ByteBuffer.allocate(100), 0);
+ segment1.setFull();
+ segment1.commit();
+
+ TieredFileSegment segment2 = fileQueue.getFileToWrite();
+ Assert.assertNotSame(segment1, segment2);
+ Assert.assertEquals(1000 + 100 + TieredCommitLog.CODA_SIZE, segment1.getMaxOffset());
+ Assert.assertEquals(1000 + 100 + TieredCommitLog.CODA_SIZE, segment2.getBaseOffset());
+
+ Assert.assertSame(fileQueue.getSegmentIndexByOffset(1000), 0);
+ Assert.assertSame(fileQueue.getSegmentIndexByOffset(1050), 0);
+ Assert.assertSame(fileQueue.getSegmentIndexByOffset(1100 + TieredCommitLog.CODA_SIZE), 1);
+ Assert.assertSame(fileQueue.getSegmentIndexByOffset(1150), -1);
+ }
+
+ @Test
+ public void testAppendAndRead() throws ClassNotFoundException, NoSuchMethodException {
+ TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, storeConfig);
+ fileQueue.setBaseOffset(0);
+ Assert.assertEquals(0, fileQueue.getMinOffset());
+ Assert.assertEquals(0, fileQueue.getCommitMsgQueueOffset());
+
+ TieredFileSegment segment1 = fileQueue.getFileToWrite();
+ segment1.initPosition(segment1.getSize());
+ Assert.assertEquals(0, segment1.getBaseOffset());
+ Assert.assertEquals(1000, fileQueue.getCommitOffset());
+ Assert.assertEquals(1000, fileQueue.getMaxOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(100);
+ long currentTimeMillis = System.currentTimeMillis();
+ buffer.putLong(currentTimeMillis);
+ buffer.rewind();
+ fileQueue.append(buffer);
+ Assert.assertEquals(1100, segment1.getMaxOffset());
+
+ segment1.setFull();
+ fileQueue.commit(true);
+ Assert.assertEquals(1100, segment1.getCommitOffset());
+
+ ByteBuffer readBuffer = fileQueue.readAsync(1000, 8).join();
+ Assert.assertEquals(currentTimeMillis, readBuffer.getLong());
+
+ TieredFileSegment segment2 = fileQueue.getFileToWrite();
+ Assert.assertNotEquals(segment1, segment2);
+ segment2.initPosition(segment2.getSize());
+ buffer.rewind();
+ fileQueue.append(buffer);
+ fileQueue.commit(true);
+ readBuffer = fileQueue.readAsync(1000, 1200).join();
+ Assert.assertEquals(currentTimeMillis, readBuffer.getLong(1100));
+ }
+
+ @Test
+ public void testLoadFromMetadata() throws ClassNotFoundException, NoSuchMethodException {
+ TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+ MemoryFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+ queue, 100, storeConfig);
+ fileSegment1.initPosition(fileSegment1.getSize());
+ fileSegment1.setFull();
+ metadataStore.updateFileSegment(fileSegment1);
+ metadataStore.updateFileSegment(fileSegment1);
+
+ MemoryFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+ queue, 1100, storeConfig);
+ metadataStore.updateFileSegment(fileSegment2);
+
+ TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+ queue, storeConfig);
+ Assert.assertEquals(2, fileQueue.needCommitFileSegmentList.size());
+ TieredFileSegment file1 = fileQueue.getFileByIndex(0);
+ Assert.assertNotNull(file1);
+ Assert.assertEquals(100, file1.getBaseOffset());
+ Assert.assertFalse(file1.isFull());
+
+ TieredFileSegment file2 = fileQueue.getFileByIndex(1);
+ Assert.assertNotNull(file2);
+ Assert.assertEquals(1100, file2.getBaseOffset());
+ Assert.assertFalse(file2.isFull());
+
+ TieredFileSegment file3 = fileQueue.getFileByIndex(2);
+ Assert.assertNull(file3);
+ }
+
+ @Test
+ public void testCheckFileSize() throws ClassNotFoundException, NoSuchMethodException {
+ TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+ TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, 100, storeConfig);
+ fileSegment1.initPosition(fileSegment1.getSize() - 100);
+ fileSegment1.setFull(false);
+ metadataStore.updateFileSegment(fileSegment1);
+ metadataStore.updateFileSegment(fileSegment1);
+
+ TieredFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, 1100, storeConfig);
+ fileSegment2.initPosition(fileSegment2.getSize() - 100);
+ metadataStore.updateFileSegment(fileSegment2);
+ metadataStore.updateFileSegment(fileSegment2);
+
+ TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, storeConfig);
+ Assert.assertEquals(1, fileQueue.needCommitFileSegmentList.size());
+
+ fileSegment1 = fileQueue.getFileByIndex(0);
+ Assert.assertTrue(fileSegment1.isFull());
+ Assert.assertEquals(fileSegment1.getSize() + 100, fileSegment1.getCommitOffset());
+
+ fileSegment2 = fileQueue.getFileByIndex(1);
+ Assert.assertEquals(1000, fileSegment2.getCommitPosition());
+
+ fileSegment2.setFull();
+ fileQueue.commit(true);
+ Assert.assertEquals(0, fileQueue.needCommitFileSegmentList.size());
+
+ fileQueue.getFileToWrite();
+ Assert.assertEquals(1, fileQueue.needCommitFileSegmentList.size());
+ }
+
+ @Test
+ public void testCleanExpiredFile() throws ClassNotFoundException, NoSuchMethodException {
+ TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+ TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, 100, storeConfig);
+ fileSegment1.initPosition(fileSegment1.getSize() - 100);
+ fileSegment1.setFull(false);
+ fileSegment1.setEndTimestamp(System.currentTimeMillis() - 1);
+ metadataStore.updateFileSegment(fileSegment1);
+ metadataStore.updateFileSegment(fileSegment1);
+
+ long file1CreateTimeStamp = System.currentTimeMillis();
+
+ TieredFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, 1100, storeConfig);
+ fileSegment2.initPosition(fileSegment2.getSize());
+ fileSegment2.setEndTimestamp(System.currentTimeMillis() + 1);
+ metadataStore.updateFileSegment(fileSegment2);
+ metadataStore.updateFileSegment(fileSegment2);
+
+ TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, storeConfig);
+ Assert.assertEquals(2, fileQueue.getFileSegmentCount());
+
+ fileQueue.cleanExpiredFile(file1CreateTimeStamp);
+ fileQueue.destroyExpiredFile();
+ Assert.assertEquals(1, fileQueue.getFileSegmentCount());
+ Assert.assertNull(metadataStore.getFileSegment(fileSegment1));
+ Assert.assertNotNull(metadataStore.getFileSegment(fileSegment2));
+
+ fileQueue.cleanExpiredFile(Long.MAX_VALUE);
+ fileQueue.destroyExpiredFile();
+ Assert.assertEquals(0, fileQueue.getFileSegmentCount());
+ Assert.assertNull(metadataStore.getFileSegment(fileSegment1));
+ Assert.assertNull(metadataStore.getFileSegment(fileSegment2));
+ }
+
+ @Test
+ public void testRollingNewFile() throws ClassNotFoundException, NoSuchMethodException {
+ TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+ TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, 100, storeConfig);
+ fileSegment1.initPosition(fileSegment1.getSize() - 100);
+ metadataStore.updateFileSegment(fileSegment1);
+
+ TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+ queue, storeConfig);
+ Assert.assertEquals(1, fileQueue.getFileSegmentCount());
+
+ fileQueue.rollingNewFile();
+ Assert.assertEquals(2, fileQueue.getFileSegmentCount());
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileSegmentTest.java
new file mode 100644
index 000000000..d189aadd9
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileSegmentTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtil;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtilTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TieredFileSegmentTest {
+ public int baseOffset = 1000;
+
+ public TieredFileSegment createFileSegment(TieredFileSegment.FileSegmentType fileType) {
+ return new MemoryFileSegment(fileType, new MessageQueue("TieredFileSegmentTest", "broker", 0),
+ baseOffset, new TieredMessageStoreConfig());
+ }
+
+ @Test
+ public void testCommitLog() {
+ TieredFileSegment segment = createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
+ segment.initPosition(segment.getSize());
+ long lastSize = segment.getSize();
+ segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+ segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+ Assert.assertTrue(segment.needCommit());
+
+ ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+ long msg3StoreTime = System.currentTimeMillis();
+ buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, msg3StoreTime);
+ long queueOffset = baseOffset * 1000L;
+ buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, queueOffset);
+ segment.append(buffer, msg3StoreTime);
+
+ Assert.assertEquals(baseOffset, segment.getBaseOffset());
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+ Assert.assertEquals(0, segment.getBeginTimestamp());
+ Assert.assertEquals(msg3StoreTime, segment.getEndTimestamp());
+
+ segment.setFull();
+ segment.commit();
+ Assert.assertFalse(segment.needCommit());
+ Assert.assertEquals(segment.getMaxOffset(), segment.getCommitOffset());
+ Assert.assertEquals(queueOffset, segment.getCommitMsgQueueOffset());
+
+ ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
+
+ ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
+
+ ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
+
+ ByteBuffer coda = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 3, TieredCommitLog.CODA_SIZE);
+ Assert.assertEquals(msg3StoreTime, coda.getLong(4 + 4));
+ }
+
+ private ByteBuffer buildConsumeQueue(long commitLogOffset) {
+ ByteBuffer cqItem = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqItem.putLong(commitLogOffset);
+ cqItem.putInt(2);
+ cqItem.putLong(3);
+ cqItem.flip();
+ return cqItem;
+ }
+
+ @Test
+ public void testConsumeQueue() {
+ TieredFileSegment segment = createFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE);
+ segment.initPosition(segment.getSize());
+ long lastSize = segment.getSize();
+ segment.append(buildConsumeQueue(baseOffset), 0);
+ segment.append(buildConsumeQueue(baseOffset + MessageBufferUtilTest.MSG_LEN), 0);
+ long cqItem3Timestamp = System.currentTimeMillis();
+ segment.append(buildConsumeQueue(baseOffset + MessageBufferUtilTest.MSG_LEN * 2), cqItem3Timestamp);
+
+ Assert.assertEquals(baseOffset + lastSize + TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3, segment.getMaxOffset());
+ Assert.assertEquals(0, segment.getBeginTimestamp());
+ Assert.assertEquals(cqItem3Timestamp, segment.getEndTimestamp());
+
+ segment.commit();
+ Assert.assertEquals(segment.getMaxOffset(), segment.getCommitOffset());
+
+ ByteBuffer cqItem1 = segment.read(lastSize, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ Assert.assertEquals(baseOffset, cqItem1.getLong());
+
+ ByteBuffer cqItem2 = segment.read(lastSize + TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ Assert.assertEquals(baseOffset + MessageBufferUtilTest.MSG_LEN, cqItem2.getLong());
+
+ ByteBuffer cqItem3 = segment.read(lastSize + TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ Assert.assertEquals(baseOffset + MessageBufferUtilTest.MSG_LEN * 2, cqItem3.getLong());
+ }
+
+ @Test
+ public void testCommitFailed() {
+ long startTime = System.currentTimeMillis();
+ MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
+ long lastSize = segment.getSize();
+ segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+ segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+
+ segment.blocker = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+ ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+ buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
+ segment.append(buffer, 0);
+ segment.blocker.complete(false);
+ }).start();
+
+ segment.commit();
+ segment.blocker.join();
+
+ segment.blocker = new CompletableFuture<>();
+ segment.blocker.complete(true);
+ segment.commit();
+
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+
+ ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
+
+ ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
+
+ ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredIndexFileTest.java
new file mode 100644
index 000000000..f4f517f65
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredIndexFileTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TieredIndexFileTest {
+ MessageQueue mq;
+ TieredMessageStoreConfig storeConfig;
+ TieredMetadataStore metadataStore;
+
+ @Before
+ public void setUp() {
+ MemoryFileSegment.checkSize = false;
+ mq = new MessageQueue("TieredIndexFileTest", "broker", 1);
+ storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "rmqut");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.store.tiered.mock.MemoryFileSegment");
+ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
+ metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ MemoryFileSegment.checkSize = true;
+ FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+// metadataStore.reLoadStore();
+ }
+
+ @Test
+ public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException {
+ // skip this test on windows
+ Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
+
+ TieredIndexFile indexFile = new TieredIndexFile(storeConfig);
+ indexFile.append(mq, 0, "key3", 3, 300, 1000);
+ indexFile.append(mq, 0, "key2", 2, 200, 1100);
+ indexFile.append(mq, 0, "key1", 1, 100, 1200);
+
+ Awaitility.waitAtMost(5, TimeUnit.SECONDS)
+ .until(() -> {
+ List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
+ if (indexList.size() != 1) {
+ return false;
+ }
+
+ ByteBuffer indexBuffer = indexList.get(0).getValue();
+ Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2, indexBuffer.remaining());
+
+ Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4));
+ Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8));
+ Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+
+ Assert.assertEquals(3, indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4));
+ Assert.assertEquals(300, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8));
+ Assert.assertEquals(0, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8 + 4));
+ return true;
+ });
+
+ indexFile.append(mq, 0, "key4", 4, 400, 1300);
+ indexFile.append(mq, 0, "key4", 4, 400, 1300);
+ indexFile.append(mq, 0, "key4", 4, 400, 1300);
+
+ Awaitility.waitAtMost(5, TimeUnit.SECONDS)
+ .until(() -> {
+ List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join();
+ if (indexList.size() != 1) {
+ return false;
+ }
+
+ ByteBuffer indexBuffer = indexList.get(0).getValue();
+ Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining());
+ Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
+ Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
+ Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+ return true;
+ });
+
+ List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join();
+ Assert.assertEquals(0, indexList.size());
+
+ indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200, 1300).join();
+ Assert.assertEquals(2, indexList.size());
+
+ ByteBuffer indexBuffer = indexList.get(0).getValue();
+ Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining());
+ Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
+ Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
+ Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+
+ indexBuffer = indexList.get(1).getValue();
+ Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE, indexBuffer.remaining());
+ Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4));
+ Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8));
+ Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
index ff73c173a..a1c5b861a 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
@@ -18,10 +18,16 @@ package org.apache.rocketmq.store.tiered.metadata;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.container.TieredCommitLog;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -30,19 +36,20 @@ import org.junit.Test;
public class MetadataStoreTest {
MessageQueue mq;
TieredMessageStoreConfig storeConfig;
- TieredStoreMetadataStore metadataStore;
+ TieredMetadataStore metadataStore;
@Before
public void setUp() {
- mq = new MessageQueue("MetadataStoreTest", "broker", 1);
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir("/tmp/rmqut");
- metadataStore = new TieredStoreMetadataManager(storeConfig);
+ storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "rmqut");
+ mq = new MessageQueue("MetadataStoreTest", storeConfig.getBrokerName(), 1);
+ metadataStore = new TieredMetadataManager(storeConfig);
}
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+ TieredStoreUtil.getMetadataStore(storeConfig).destroy();
}
@Test
@@ -120,15 +127,54 @@ public class MetadataStoreTest {
Assert.assertNotNull(metadataStore.getTopic(mq.getTopic() + "1"));
}
+ @Test
+ public void testFileSegment() {
+ MemoryFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+ mq,
+ 100,
+ storeConfig);
+ fileSegment1.initPosition(fileSegment1.getSize());
+ FileSegmentMetadata metadata1 = metadataStore.updateFileSegment(fileSegment1);
+ Assert.assertEquals(mq, metadata1.getQueue());
+ Assert.assertEquals(TieredFileSegment.FileSegmentType.COMMIT_LOG, TieredFileSegment.FileSegmentType.valueOf(metadata1.getType()));
+ Assert.assertEquals(100, metadata1.getBaseOffset());
+ Assert.assertEquals(0, metadata1.getSealTimestamp());
+
+ fileSegment1.setFull();
+ metadata1 = metadataStore.updateFileSegment(fileSegment1);
+ Assert.assertEquals(1000, metadata1.getSize());
+ Assert.assertEquals(0, metadata1.getSealTimestamp());
+
+ fileSegment1.commit();
+ metadata1 = metadataStore.updateFileSegment(fileSegment1);
+ Assert.assertEquals(1000 + TieredCommitLog.CODA_SIZE, metadata1.getSize());
+ Assert.assertTrue(metadata1.getSealTimestamp() > 0);
+
+ MemoryFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+ mq,
+ 1100,
+ storeConfig);
+ metadataStore.updateFileSegment(fileSegment2);
+ List<FileSegmentMetadata> list = new ArrayList<>();
+ metadataStore.iterateFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG, "MetadataStoreTest", 1, list::add);
+ Assert.assertEquals(2, list.size());
+ Assert.assertEquals(100, list.get(0).getBaseOffset());
+ Assert.assertEquals(1100, list.get(1).getBaseOffset());
+
+ Assert.assertNotNull(metadataStore.getFileSegment(fileSegment1));
+ metadataStore.deleteFileSegment(fileSegment1);
+ Assert.assertNull(metadataStore.getFileSegment(fileSegment1));
+ }
+
@Test
public void testReload() {
- TieredStoreMetadataManager metadataManager = (TieredStoreMetadataManager) metadataStore;
+ TieredMetadataManager metadataManager = (TieredMetadataManager) metadataStore;
metadataManager.addTopic(mq.getTopic(), 1);
metadataManager.addQueue(mq, 2);
metadataManager.persist();
Assert.assertTrue(new File(metadataManager.configFilePath()).exists());
- metadataManager = new TieredStoreMetadataManager(storeConfig);
+ metadataManager = new TieredMetadataManager(storeConfig);
metadataManager.load();
TopicMetadata topicMetadata = metadataManager.getTopic(mq.getTopic());
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/mock/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/mock/MemoryFileSegment.java
new file mode 100644
index 000000000..0071963cf
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/mock/MemoryFileSegment.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.mock;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+import org.junit.Assert;
+
+public class MemoryFileSegment extends TieredFileSegment {
+ private final ByteBuffer memStore;
+
+ public CompletableFuture<Boolean> blocker;
+
+ public static boolean checkSize = true;
+
+ public MemoryFileSegment(TieredFileSegment.FileSegmentType fileType, MessageQueue messageQueue, long baseOffset,
+ TieredMessageStoreConfig storeConfig) {
+ super(fileType, messageQueue, baseOffset, storeConfig);
+ switch (fileType) {
+ case COMMIT_LOG:
+ memStore = ByteBuffer.allocate(10000);
+ break;
+ case CONSUME_QUEUE:
+ memStore = ByteBuffer.allocate(10000);
+ break;
+ case INDEX:
+ memStore = ByteBuffer.allocate(10000);
+ break;
+ default:
+ memStore = null;
+ break;
+ }
+ memStore.position((int) getSize());
+ }
+
+ @Override public String getPath() {
+ return "/tiered/" + fileType + File.separator + baseOffset;
+ }
+
+ @Override public long getSize() {
+ if (checkSize) {
+ return 1000;
+ }
+ return 0;
+ }
+
+ @Override protected void createFile() {
+
+ }
+
+ @Override protected CompletableFuture<ByteBuffer> read0(long position, int length) {
+ ByteBuffer buffer = memStore.duplicate();
+ buffer.position((int) position);
+ ByteBuffer slice = buffer.slice();
+ slice.limit(length);
+ return CompletableFuture.completedFuture(slice);
+ }
+
+ @Override
+ protected CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
+ boolean append) {
+ try {
+ if (blocker != null && !blocker.get()) {
+ throw new IllegalStateException();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ Assert.assertTrue(!checkSize || position >= getSize());
+
+ byte[] buffer = new byte[1024];
+
+ int startPos = memStore.position();
+ try {
+ int len;
+ while ((len = inputStream.read(buffer)) > 0) {
+ memStore.put(buffer, 0, len);
+ }
+ Assert.assertEquals(length, memStore.position() - startPos);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ return CompletableFuture.completedFuture(false);
+ }
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override protected boolean exists() {
+ return false;
+ }
+
+ @Override protected void destroyFile() {
+
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtilTest.java
new file mode 100644
index 000000000..d0e4932c9
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtilTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CQItemBufferUtilTest {
+ private static ByteBuffer cqItem;
+
+ @BeforeClass
+ public static void setUp() {
+ cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ cqItem.putLong(1);
+ cqItem.putInt(2);
+ cqItem.putLong(3);
+ cqItem.flip();
+ }
+
+ @Test
+ public void testGetCommitLogOffset() {
+ Assert.assertEquals(1, CQItemBufferUtil.getCommitLogOffset(cqItem));
+ }
+
+ @Test
+ public void testGetSize() {
+ Assert.assertEquals(2, CQItemBufferUtil.getSize(cqItem));
+ }
+
+ @Test
+ public void testGetTagCode() {
+ Assert.assertEquals(3, CQItemBufferUtil.getTagCode(cqItem));
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtilTest.java
new file mode 100644
index 000000000..739629a56
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtilTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.tiered.container.TieredCommitLog;
+import org.apache.rocketmq.store.tiered.container.TieredConsumeQueue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageBufferUtilTest {
+ public static final int MSG_LEN = 4 //TOTALSIZE
+ + 4 //MAGICCODE
+ + 4 //BODYCRC
+ + 4 //QUEUEID
+ + 4 //FLAG
+ + 8 //QUEUEOFFSET
+ + 8 //PHYSICALOFFSET
+ + 4 //SYSFLAG
+ + 8 //BORNTIMESTAMP
+ + 8 //BORNHOST
+ + 8 //STORETIMESTAMP
+ + 8 //STOREHOSTADDRESS
+ + 4 //RECONSUMETIMES
+ + 8 //Prepared Transaction Offset
+ + 4 + 0 //BODY
+ + 2 + 0 //TOPIC
+ + 2 + 30 //properties
+ + 0;
+
+ public static ByteBuffer buildMessageBuffer() {
+ // Initialization of storage space
+ ByteBuffer buffer = ByteBuffer.allocate(MSG_LEN);
+ // 1 TOTALSIZE
+ buffer.putInt(MSG_LEN);
+ // 2 MAGICCODE
+ buffer.putInt(MessageDecoder.MESSAGE_MAGIC_CODE_V2);
+ // 3 BODYCRC
+ buffer.putInt(3);
+ // 4 QUEUEID
+ buffer.putInt(4);
+ // 5 FLAG
+ buffer.putInt(5);
+ // 6 QUEUEOFFSET
+ buffer.putLong(6);
+ // 7 PHYSICALOFFSET
+ buffer.putLong(7);
+ // 8 SYSFLAG
+ buffer.putInt(8);
+ // 9 BORNTIMESTAMP
+ buffer.putLong(9);
+ // 10 BORNHOST
+ buffer.putLong(10);
+ // 11 STORETIMESTAMP
+ buffer.putLong(11);
+ // 12 STOREHOSTADDRESS
+ buffer.putLong(10);
+ // 13 RECONSUMETIMES
+ buffer.putInt(13);
+ // 14 Prepared Transaction Offset
+ buffer.putLong(14);
+ // 15 BODY
+ buffer.putInt(0);
+ // 16 TOPIC
+ buffer.putShort((short) 0);
+ // 17 PROPERTIES
+ Map<String, String> map = new HashMap<>();
+ map.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "uk");
+ map.put("userkey", "uservalue0");
+ String properties = MessageDecoder.messageProperties2String(map);
+ byte[] propertiesBytes = properties.getBytes(StandardCharsets.UTF_8);
+ buffer.putShort((short) propertiesBytes.length);
+ buffer.put(propertiesBytes);
+ buffer.flip();
+
+ Assert.assertEquals(MSG_LEN, buffer.remaining());
+ return buffer;
+ }
+
+ @Test
+ public void testGetTotalSize() {
+ ByteBuffer buffer = buildMessageBuffer();
+ int totalSize = MessageBufferUtil.getTotalSize(buffer);
+ Assert.assertEquals(MSG_LEN, totalSize);
+ }
+
+ @Test
+ public void testGetMagicCode() {
+ ByteBuffer buffer = buildMessageBuffer();
+ int magicCode = MessageBufferUtil.getMagicCode(buffer);
+ Assert.assertEquals(MessageDecoder.MESSAGE_MAGIC_CODE_V2, magicCode);
+ }
+
+ @Test
+ public void testSplitMessages() {
+ ByteBuffer msgBuffer1 = buildMessageBuffer();
+ msgBuffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 10);
+ ByteBuffer msgBuffer2 = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
+
+ msgBuffer2.putInt(TieredCommitLog.CODA_SIZE);
+ msgBuffer2.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
+ msgBuffer2.putLong(System.currentTimeMillis());
+ msgBuffer2.flip();
+
+ ByteBuffer msgBuffer3 = buildMessageBuffer();
+ msgBuffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 11);
+
+ ByteBuffer msgBuffer = ByteBuffer.allocate(msgBuffer1.remaining() + msgBuffer2.remaining() + msgBuffer3.remaining());
+ msgBuffer.put(msgBuffer1);
+ msgBuffer.put(msgBuffer2);
+ msgBuffer.put(msgBuffer3);
+ msgBuffer.flip();
+
+ ByteBuffer cqBuffer1 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqBuffer1.putLong(1000);
+ cqBuffer1.putInt(MSG_LEN);
+ cqBuffer1.putLong(0);
+ cqBuffer1.flip();
+
+ ByteBuffer cqBuffer2 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqBuffer2.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
+ cqBuffer2.putInt(MSG_LEN);
+ cqBuffer2.putLong(0);
+ cqBuffer2.flip();
+
+ ByteBuffer cqBuffer3 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqBuffer3.putLong(1000 + MSG_LEN);
+ cqBuffer3.putInt(MSG_LEN);
+ cqBuffer3.putLong(0);
+ cqBuffer3.flip();
+
+ ByteBuffer cqBuffer4 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqBuffer4.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
+ cqBuffer4.putInt(MSG_LEN - 10);
+ cqBuffer4.putLong(0);
+ cqBuffer4.flip();
+
+ ByteBuffer cqBuffer5 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqBuffer5.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
+ cqBuffer5.putInt(MSG_LEN * 10);
+ cqBuffer5.putLong(0);
+ cqBuffer5.flip();
+
+ ByteBuffer cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
+ cqBuffer.put(cqBuffer1);
+ cqBuffer.put(cqBuffer2);
+ cqBuffer.flip();
+ cqBuffer1.rewind();
+ cqBuffer2.rewind();
+ List<Pair<Integer, Integer>> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+ Assert.assertEquals(2, msgList.size());
+ Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+ Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
+
+ cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
+ cqBuffer.put(cqBuffer1);
+ cqBuffer.put(cqBuffer4);
+ cqBuffer.flip();
+ cqBuffer1.rewind();
+ cqBuffer4.rewind();
+ msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+ Assert.assertEquals(1, msgList.size());
+ Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+
+ cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3);
+ cqBuffer.put(cqBuffer1);
+ cqBuffer.put(cqBuffer3);
+ cqBuffer.flip();
+ msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+ Assert.assertEquals(2, msgList.size());
+ Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+ Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
+
+ cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ cqBuffer.put(cqBuffer5);
+ cqBuffer.flip();
+ msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+ Assert.assertEquals(0, msgList.size());
+ }
+
+ @Test
+ public void testGetQueueOffset() {
+ ByteBuffer buffer = buildMessageBuffer();
+ long queueOffset = MessageBufferUtil.getQueueOffset(buffer);
+ Assert.assertEquals(6, queueOffset);
+ }
+
+ @Test
+ public void testGetStoreTimeStamp() {
+ ByteBuffer buffer = buildMessageBuffer();
+ long storeTimeStamp = MessageBufferUtil.getStoreTimeStamp(buffer);
+ Assert.assertEquals(11, storeTimeStamp);
+ }
+
+ @Test
+ public void testGetOffsetId() {
+ ByteBuffer buffer = buildMessageBuffer();
+ InetSocketAddress inetSocketAddress = new InetSocketAddress("255.255.255.255", 65535);
+ ByteBuffer addr = ByteBuffer.allocate(Long.BYTES);
+ addr.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
+ addr.putInt(inetSocketAddress.getPort());
+ addr.flip();
+ for (int i = 0; i < addr.remaining(); i++) {
+ buffer.put(MessageBufferUtil.STORE_HOST_POSITION + i, addr.get(i));
+ }
+ String excepted = MessageDecoder.createMessageId(ByteBuffer.allocate(TieredStoreUtil.MSG_ID_LENGTH), addr, 7);
+ String offsetId = MessageBufferUtil.getOffsetId(buffer);
+ Assert.assertEquals(excepted, offsetId);
+ }
+
+ @Test
+ public void testGetProperties() {
+ ByteBuffer buffer = buildMessageBuffer();
+ Map<String, String> properties = MessageBufferUtil.getProperties(buffer);
+ Assert.assertEquals(2, properties.size());
+ Assert.assertTrue(properties.containsKey(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ Assert.assertEquals("uk", properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ Assert.assertTrue(properties.containsKey("userkey"));
+ Assert.assertEquals("uservalue0", properties.get("userkey"));
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtilTest.java
new file mode 100644
index 000000000..1bb462d8b
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtilTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TieredStoreUtilTest {
+
+ private static final Map<Long, String> DATA_MAP = new HashMap<Long, String>() {
+ {
+ put(0L, "0Bytes");
+ put(1023L, "1023Bytes");
+ put(1024L, "1KB");
+ put(12_345L, "12.06KB");
+ put(10_123_456L, "9.65MB");
+ put(10_123_456_798L, "9.43GB");
+ put(1_777_777_777_777_777_777L, "1.54EB");
+ }
+ };
+
+ @Test
+ public void getHash() {
+ Assert.assertEquals("161c08ff", TieredStoreUtil.getHash("TieredStorageDailyTest"));
+ }
+
+ @Test
+ public void testOffset2FileName() {
+ Assert.assertEquals("cfcd208400000000000000000000", TieredStoreUtil.offset2FileName(0));
+ Assert.assertEquals("b10da56800000000004294937144", TieredStoreUtil.offset2FileName(4294937144L));
+ }
+
+ @Test
+ public void testFileName2Offset() {
+ Assert.assertEquals(0, TieredStoreUtil.fileName2Offset("cfcd208400000000000000000000"));
+ Assert.assertEquals(4294937144L, TieredStoreUtil.fileName2Offset("b10da56800000000004294937144"));
+ }
+
+ @Test
+ public void testToHumanReadable() {
+ DATA_MAP.forEach((in, expected) -> Assert.assertEquals(expected, TieredStoreUtil.toHumanReadable(in)));
+ }
+}
diff --git a/tieredstore/src/test/resources/logback.xml b/tieredstore/src/test/resources/logback.xml
new file mode 100644
index 000000000..b70b42046
--- /dev/null
+++ b/tieredstore/src/test/resources/logback.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="debug">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>