You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/09/07 09:02:21 UTC
[rocketmq] branch 5.0.0-beta-compact updated: Support replicate between master and slave (#4998)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch 5.0.0-beta-compact
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-compact by this push:
new 07185a6ff Support replicate between master and slave (#4998)
07185a6ff is described below
commit 07185a6ff6588a5ddeec91d2d43bef2d0c95dcf5
Author: ltamber <lt...@gmail.com>
AuthorDate: Wed Sep 7 17:02:00 2022 +0800
Support replicate between master and slave (#4998)
* replicate from master
* fix unit test
* fix unit test
* LogAndCq
* combine log and CQ
* replace file
* clean after replicating
* unit test
* bug fix
* dump command
* compaction cycle and dump log tool
* fix the first compaction not trigger
* optimize stop pull logic
* fix checkstyle
* replaces error when compact replicating and current
* force clean cq in replicating
* force clean cq in replicating
* mapped file queue empty judgement fix
* clean compacting folder
* validate cqUnit when getMessage
* clear files when replace completed
* optimize sparse consume queue recover
* disable allocate mapped file & separate compaction load and start
* code optimize
* fix compaction error
* checkstyle
* checkstyle
* compactionLog size base on sparse consume queue size
* bugfix
* fix
* fix unit test
---
.../apache/rocketmq/broker/BrokerController.java | 1 +
.../java/org/apache/rocketmq/store/CommitLog.java | 2 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 10 +-
.../org/apache/rocketmq/store/MappedFileQueue.java | 4 +
.../rocketmq/store/config/MessageStoreConfig.java | 2 +-
.../apache/rocketmq/store/kv/CompactionLog.java | 564 +++++++++++++++------
.../rocketmq/store/kv/CompactionPositionMgr.java | 4 +
.../rocketmq/store/kv/CompactionService.java | 22 +-
.../apache/rocketmq/store/kv/CompactionStore.java | 64 ++-
.../apache/rocketmq/store/kv/MessageFetcher.java | 200 ++++++++
.../rocketmq/store/queue/SparseConsumeQueue.java | 113 ++++-
.../rocketmq/store/kv/CompactionLogTest.java | 72 ++-
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../command/message/DumpCompactionLogCommand.java | 109 ++++
14 files changed, 953 insertions(+), 216 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2fdb6f83d..b49ed5dd5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1650,6 +1650,7 @@ public class BrokerController {
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+ this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 82ceaae9b..cf1a7f857 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -525,7 +525,7 @@ public class CommitLog implements Swappable {
return dispatchRequest;
} catch (Exception e) {
- log.error("CheckMessageAndReturnSizeOld", e);
+
}
return new DispatchRequest(-1, false /* success */);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 81aa99c53..b1a5247ab 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -211,8 +211,6 @@ public class DefaultMessageStore implements MessageStore {
this.transientStorePool = new TransientStorePool(messageStoreConfig);
- this.compactionService.start();
-
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -284,6 +282,8 @@ public class DefaultMessageStore implements MessageStore {
// load Consume Queue
result = result && this.consumeQueueStore.load();
+ result = result && this.compactionService.load(lastExitOK);
+
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
@@ -349,6 +349,7 @@ public class DefaultMessageStore implements MessageStore {
this.flushConsumeQueueService.start();
this.commitLog.start();
+ this.compactionService.start();
this.storeStatsService.start();
if (this.haService != null) {
@@ -1213,6 +1214,9 @@ public class DefaultMessageStore implements MessageStore {
if (this.haService != null) {
this.haService.updateMasterAddress(newAddr);
}
+ if (this.compactionService != null) {
+ this.compactionService.updateMasterAddress(newAddr);
+ }
}
@Override
@@ -2355,7 +2359,7 @@ public class DefaultMessageStore implements MessageStore {
}
}
- compactionStore.flushCq(flushConsumeQueueLeastPages);
+ compactionStore.flushCQ(flushConsumeQueueLeastPages);
if (0 == flushConsumeQueueLeastPages) {
if (logicsMsgTimestamp > 0) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index b9b7a1fdb..5b7c84ad0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -776,4 +776,8 @@ public class MappedFileQueue implements Swappable {
public long getTotalFileSize() {
return (long) mappedFileSize * mappedFiles.size();
}
+
+ public String getStorePath() {
+ return storePath;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index eb34fbdb6..da27b1dd6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -44,7 +44,7 @@ public class MessageStoreConfig {
// CompactionLog consumeQueue file size, default is 10M
private int compactionCqMappedFileSize = 10 * 1024 * 1024;
- private int compactionScheduleInternal = 15 * 60 * 60;
+ private int compactionScheduleInternal = 15 * 60 * 1000;
private int maxOffsetMapSize = 100 * 1024 * 1024;
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
index 1ed6ea1a5..e5fdae714 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
@@ -57,13 +57,12 @@ import java.security.DigestException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.rocketmq.store.CommitLog.BLANK_MAGIC_CODE;
@@ -73,101 +72,197 @@ public class CompactionLog {
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
private static final int MAX_PULL_MSG_SIZE = 128 * 1024 * 1024;
- public static final String SUB_FOLDER = "compacting";
+ public static final String COMPACTING_SUB_FOLDER = "compacting";
+ public static final String REPLICATING_SUB_FOLDER = "replicating";
private final int compactionLogMappedFileSize;
private final int compactionCqMappedFileSize;
private final String compactionLogFilePath;
private final String compactionCqFilePath;
private final MessageStore defaultMessageStore;
+ private final CompactionStore compactionStore;
private final MessageStoreConfig messageStoreConfig;
-
private final CompactionAppendMsgCallback endMsgCallback;
-
private final String topic;
private final int queueId;
private final int offsetMapMemorySize;
-
private final PutMessageLock putMessageLock;
private final PutMessageLock readMessageLock;
- private MappedFileQueue currentMappedFileQueue;
- private MappedFileQueue newMappedFileQueue;
- private SparseConsumeQueue currentBcq;
- private SparseConsumeQueue compactionBcq;
+ private TopicPartitionLog current;
+ private TopicPartitionLog compacting;
+ private TopicPartitionLog replicating;
private CompactionPositionMgr positionMgr;
- private AtomicBoolean compacting = new AtomicBoolean(false);
+ private AtomicReference<State> state;
- public CompactionLog(final MessageStore messageStore, final String topic, final int queueId,
- final int offsetMapMemorySize, CompactionPositionMgr positionMgr, String compactionLogStoreRootPath,
- String compactionCqStoreRootPath)
+ public CompactionLog(final MessageStore messageStore, final CompactionStore compactionStore, final String topic, final int queueId)
throws IOException {
this.topic = topic;
this.queueId = queueId;
- this.offsetMapMemorySize = offsetMapMemorySize;
this.defaultMessageStore = messageStore;
+ this.compactionStore = compactionStore;
this.messageStoreConfig = messageStore.getMessageStoreConfig();
- this.compactionLogMappedFileSize = messageStoreConfig.getCompactionMappedFileSize();
+ this.offsetMapMemorySize = compactionStore.getOffsetMapSize();
this.compactionCqMappedFileSize =
messageStoreConfig.getCompactionCqMappedFileSize() / BatchConsumeQueue.CQ_STORE_UNIT_SIZE
* BatchConsumeQueue.CQ_STORE_UNIT_SIZE;
- this.compactionLogFilePath = Paths.get(compactionLogStoreRootPath, topic, String.valueOf(queueId)).toString();
- this.compactionCqFilePath = compactionCqStoreRootPath; // batch consume queue already separated
+ this.compactionLogMappedFileSize = getCompactionLogSize(compactionCqMappedFileSize,
+ messageStoreConfig.getCompactionMappedFileSize());
+ this.compactionLogFilePath = Paths.get(compactionStore.getCompactionLogPath(),
+ topic, String.valueOf(queueId)).toString();
+ this.compactionCqFilePath = compactionStore.getCompactionCqPath(); // batch consume queue already separated
+ this.positionMgr = compactionStore.getPositionMgr();
- this.currentMappedFileQueue = new MappedFileQueue(compactionLogFilePath, compactionLogMappedFileSize,
- defaultMessageStore.getAllocateMappedFileService());
this.putMessageLock =
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() :
new PutMessageSpinLock();
this.readMessageLock =
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() :
new PutMessageSpinLock();
- this.positionMgr = positionMgr;
this.endMsgCallback = new CompactionAppendEndMsgCallback();
- this.load();
- log.info("CompactionLog init completed.");
+ this.state = new AtomicReference<>(State.INITIALIZING);
+// this.load();
+ log.info("CompactionLog {}:{} init completed.", topic, queueId);
}
- private void load() throws IOException, RuntimeException {
- if (!this.currentMappedFileQueue.load()) {
- throw new IOException("load compactionLog exception");
+ private int getCompactionLogSize(int cqSize, int origLogSize) {
+ int n = origLogSize / cqSize;
+ if (n < 5) {
+ return cqSize * 5;
+ }
+ int m = origLogSize % cqSize;
+ if (m > 0 && m < (cqSize >> 1)) {
+ return n * cqSize;
+ } else {
+ return (n + 1) * cqSize;
}
- loadCq();
- recover();
- sanityCheck();
}
- private void loadCq() {
- SparseConsumeQueue bcq = new SparseConsumeQueue(topic, queueId, compactionCqFilePath,
- compactionCqMappedFileSize, defaultMessageStore);
- bcq.load();
- bcq.recover();
- currentBcq = bcq;
+ public void load(boolean exitOk) throws IOException, RuntimeException {
+ initLogAndCq(exitOk);
+ if (defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE
+ && getLog().isMappedFilesEmpty()) {
+ log.info("{}:{} load compactionLog from remote master", topic, queueId);
+ loadFromRemoteAsync();
+ } else {
+ state.compareAndSet(State.INITIALIZING, State.NORMAL);
+ }
}
- private void recover() {
- long maxCqPhysicOffset = currentBcq.getMaxPhysicOffset();
- if (maxCqPhysicOffset > 0) {
- log.info("max cq physical offset is {}", maxCqPhysicOffset);
- this.currentMappedFileQueue.setFlushedWhere(maxCqPhysicOffset);
- this.currentMappedFileQueue.setCommittedWhere(maxCqPhysicOffset);
- this.currentMappedFileQueue.truncateDirtyFiles(maxCqPhysicOffset);
- }
+ private void initLogAndCq(boolean exitOk) throws IOException, RuntimeException {
+ current = new TopicPartitionLog(this);
+ current.init(exitOk);
}
- void sanityCheck() throws RuntimeException {
- List<MappedFile> mappedFileList = currentMappedFileQueue.getMappedFiles();
- for (MappedFile file : mappedFileList) {
- if (!currentBcq.containsOffsetFile(Long.parseLong(file.getFile().getName()))) {
- throw new RuntimeException("log file mismatch with consumeQueue file " + file.getFileName());
+
+ private boolean putMessageFromRemote(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ // split bytebuffer to avoid encode message again
+ while (byteBuffer.hasRemaining()) {
+ int mark = byteBuffer.position();
+ ByteBuffer bb = byteBuffer.slice();
+ int size = bb.getInt();
+ if (size < 0 || size > byteBuffer.capacity()) {
+ break;
+ } else {
+ bb.limit(size);
+ bb.rewind();
+ }
+
+ MessageExt messageExt = MessageDecoder.decode(bb, false, false);
+ long messageOffset = messageExt.getQueueOffset();
+ long minOffsetInQueue = getCQ().getMinOffsetInQueue();
+ if (getLog().isMappedFilesEmpty() || messageOffset < minOffsetInQueue) {
+ asyncPutMessage(bb, messageExt, replicating);
+ } else {
+ log.info("{}:{} message offset {} >= minOffsetInQueue {}, stop pull...",
+ topic, queueId, messageOffset, minOffsetInQueue);
+ return false;
}
+
+ byteBuffer.position(mark + size);
+ }
+
+ return true;
+
+ }
+
+ private void pullMessageFromMaster() throws Exception {
+
+ if (StringUtils.isBlank(compactionStore.getMasterAddr())) {
+ compactionStore.getCompactionSchedule().schedule(() -> {
+ try {
+ pullMessageFromMaster();
+ } catch (Exception e) {
+ log.error("pullMessageFromMaster exception: ", e);
+ }
+ }, 5, TimeUnit.SECONDS);
+ return;
+ }
+
+ replicating = new TopicPartitionLog(this, REPLICATING_SUB_FOLDER);
+ try (MessageFetcher messageFetcher = new MessageFetcher()) {
+ messageFetcher.pullMessageFromMaster(topic, queueId, getCQ().getMinOffsetInQueue(),
+ compactionStore.getMasterAddr(), (currOffset, response) -> {
+ if (currOffset < 0) {
+ log.info("{}:{} current offset {}, stop pull...", topic, queueId, currOffset);
+ return false;
+ }
+ return putMessageFromRemote(response.getBody());
+// positionMgr.setOffset(topic, queueId, currOffset);
+ });
}
- List<MappedFile> cqMappedFileList = currentBcq.getMappedFileQueue().getMappedFiles();
- for (MappedFile file: cqMappedFileList) {
- if (mappedFileList.stream().noneMatch(m -> Objects.equals(m.getFile().getName(), file.getFile().getName()))) {
- throw new RuntimeException("consumeQueue file mismatch with log file " + file.getFileName());
+ // merge files
+ if (getLog().isMappedFilesEmpty()) {
+ replaceFiles(getLog().getMappedFiles(), current, replicating);
+ } else if (replicating.getLog().isMappedFilesEmpty()) {
+ log.info("replicating message is empty"); //break
+ } else {
+ List<MappedFile> newFiles = Lists.newArrayList();
+ List<MappedFile> toCompactFiles = Lists.newArrayList(replicating.getLog().getMappedFiles());
+ putMessageLock.lock();
+ try {
+ // combine current and replicating to mappedFileList
+ newFiles = Lists.newArrayList(getLog().getMappedFiles());
+ toCompactFiles.addAll(newFiles); //all from current
+ current.roll(toCompactFiles.size() * compactionLogMappedFileSize);
+ } catch (Throwable e) {
+ log.error("roll log and cq exception: ", e);
+ } finally {
+ putMessageLock.unlock();
+ }
+
+ try {
+ // doCompaction with current and replicating
+ compactAndReplace(new FileList(toCompactFiles, newFiles));
+ } catch (Throwable e) {
+ log.error("do merge replicating and current exception: ", e);
}
}
+
+ // cleanReplicatingResource, force clean cq
+ replicating.clean(false, true);
+
+// positionMgr.setOffset(topic, queueId, currentPullOffset);
+ state.compareAndSet(State.INITIALIZING, State.NORMAL);
+ }
+ private void loadFromRemoteAsync() {
+ compactionStore.getCompactionSchedule().submit(() -> {
+ try {
+ pullMessageFromMaster();
+ } catch (Exception e) {
+ log.error("fetch message from master exception: ", e);
+ }
+ });
+
+ // update (currentStatus) = LOADING
+
+ // request => get (start, end)
+ // pull message => current message offset > end
+ // done
+ // positionMgr.persist();
+
+ // update (currentStatus) = RUNNING
}
private long nextOffsetCorrection(long oldOffset, long newOffset) {
@@ -220,28 +315,6 @@ public class CompactionLog {
return false;
}
- private boolean shouldRoll(MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq, final int msgSize) {
- return mappedFileQueue.shouldRoll(msgSize) || bcq.shouldRoll();
- }
-
- private boolean isEmptyOrCurrentFileFull(MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) {
- return mappedFileQueue.isEmptyOrCurrentFileFull() || bcq.getMappedFileQueue().isEmptyOrCurrentFileFull();
- }
-
- private void rollLogAndCq(MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) throws IOException {
- MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
- if (mappedFile == null) {
- throw new IOException("create new file error");
- }
- long baseOffset = mappedFile.getFileFromOffset();
- MappedFile cqFile = bcq.createFile(baseOffset);
- if (cqFile == null) {
- mappedFile.destroy(1000);
- mappedFileQueue.getMappedFiles().remove(mappedFile);
- throw new IOException("create new consumeQueue file error");
- }
- }
-
public long rollNextFile(final long offset) {
return offset + compactionLogMappedFileSize - offset % compactionLogMappedFileSize;
}
@@ -263,28 +336,28 @@ public class CompactionLog {
}
public void checkAndPutMessage(final SelectMappedBufferResult selectMappedBufferResult, final MessageExt msgExt,
- final OffsetMap offsetMap, final MappedFileQueue mappedFileQueue, final SparseConsumeQueue bcq)
+ final OffsetMap offsetMap, final TopicPartitionLog tpLog)
throws DigestException {
if (shouldRetainMsg(msgExt, offsetMap)) {
- asyncPutMessage(selectMappedBufferResult, msgExt, mappedFileQueue, bcq);
+ asyncPutMessage(selectMappedBufferResult.getByteBuffer(), msgExt, tpLog);
}
}
public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult) {
- return asyncPutMessage(selectMappedBufferResult, currentMappedFileQueue, currentBcq);
+ return asyncPutMessage(selectMappedBufferResult, current);
}
public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult,
- MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) {
+ final TopicPartitionLog tpLog) {
MessageExt msgExt = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer(), false, false);
- return asyncPutMessage(selectMappedBufferResult, msgExt, mappedFileQueue, bcq);
+ return asyncPutMessage(selectMappedBufferResult.getByteBuffer(), msgExt, tpLog);
}
- public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult,
- final MessageExt msgExt, MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) {
+ public CompletableFuture<PutMessageResult> asyncPutMessage(final ByteBuffer msgBuffer,
+ final MessageExt msgExt, final TopicPartitionLog tpLog) {
// fix duplicate
- if (bcq.getMaxOffsetInQueue() >= msgExt.getQueueOffset()) {
+ if (tpLog.getCQ().getMaxOffsetInQueue() - 1 >= msgExt.getQueueOffset()) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
@@ -298,32 +371,32 @@ public class CompactionLog {
try {
long beginTime = System.nanoTime();
- if (isEmptyOrCurrentFileFull(mappedFileQueue, bcq)) {
+ if (tpLog.isEmptyOrCurrentFileFull()) {
try {
- rollLogAndCq(mappedFileQueue, bcq);
+ tpLog.roll();
} catch (IOException e) {
log.error("create mapped file or consumerQueue exception: ", e);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}
}
- MappedFile mappedFile = mappedFileQueue.getLastMappedFile();
+ MappedFile mappedFile = tpLog.getLog().getLastMappedFile();
- CompactionAppendMsgCallback callback = new CompactionAppendMessageCallback(msgExt, bcq);
- AppendMessageResult result = mappedFile.appendMessage(selectMappedBufferResult.getByteBuffer(), callback);
+ CompactionAppendMsgCallback callback = new CompactionAppendMessageCallback(msgExt, tpLog.getCQ());
+ AppendMessageResult result = mappedFile.appendMessage(msgBuffer, callback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
try {
- rollLogAndCq(mappedFileQueue, bcq);
+ tpLog.roll();
} catch (IOException e) {
log.error("create mapped file2 error, topic: {}, clientAddr: {}", msgExt.getTopic(), msgExt.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
- mappedFile = mappedFileQueue.getLastMappedFile();
- result = mappedFile.appendMessage(selectMappedBufferResult.getByteBuffer(), callback);
+ mappedFile = tpLog.getLog().getLastMappedFile();
+ result = mappedFile.appendMessage(msgBuffer, callback);
break;
default:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
@@ -337,7 +410,7 @@ public class CompactionLog {
private SelectMappedBufferResult getMessage(final long offset, final int size) {
- MappedFile mappedFile = this.currentMappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+ MappedFile mappedFile = this.getLog().findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % compactionLogMappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
@@ -345,6 +418,13 @@ public class CompactionLog {
return null;
}
+ private boolean validateCqUnit(CqUnit cqUnit) {
+ return cqUnit.getPos() >= 0
+ && cqUnit.getSize() > 0
+ && cqUnit.getQueueOffset() >= 0
+ && cqUnit.getBatchNum() > 0;
+ }
+
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize) {
readMessageLock.lock();
@@ -358,9 +438,9 @@ public class CompactionLog {
GetMessageResult getResult = new GetMessageResult();
- final long maxOffsetPy = currentMappedFileQueue.getMaxOffset();
+ final long maxOffsetPy = getLog().getMaxOffset();
- SparseConsumeQueue consumeQueue = currentBcq;
+ SparseConsumeQueue consumeQueue = getCQ();
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
@@ -407,13 +487,11 @@ public class CompactionLog {
long nextPhyFileStartOffset = Long.MIN_VALUE;
while (bufferConsumeQueue.hasNext() && nextBeginOffset < maxOffset) {
CqUnit cqUnit = bufferConsumeQueue.next();
- long offsetPy = cqUnit.getPos();
- int sizePy = cqUnit.getSize();
-
- if (offsetPy < 0) {
- // indicate that the mapped file ended
+ if (!validateCqUnit(cqUnit)) {
break;
}
+ long offsetPy = cqUnit.getPos();
+ int sizePy = cqUnit.getSize();
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
@@ -476,50 +554,60 @@ public class CompactionLog {
}
}
- List<MappedFile> getCompactionFile() {
- List<MappedFile> mappedFileList = Lists.newArrayList(currentMappedFileQueue.getMappedFiles());
+ FileList getCompactionFile() {
+ List<MappedFile> mappedFileList = Lists.newArrayList(getLog().getMappedFiles());
if (mappedFileList.size() < 2) {
- return Collections.emptyList();
+ return null;
}
+ List<MappedFile> toCompactFiles = mappedFileList.subList(0, mappedFileList.size() - 1);
+
//exclude the last writing file
- for (int i = mappedFileList.size() - 2; i >= 0; i--) {
+ List<MappedFile> newFiles = Lists.newArrayList();
+ for (int i = 0; i < mappedFileList.size() - 1; i++) {
MappedFile mf = mappedFileList.get(i);
- long maxQueueOffset = currentBcq.getMaxMsgOffsetFromFile(mf.getFile().getName());
- if (maxQueueOffset <= positionMgr.getOffset(topic, queueId)) {
- if (i < mappedFileList.size() - 2) {
- // next to end
- return mappedFileList.subList(i + 1, mappedFileList.size() - 2);
- } else {
- return Collections.emptyList();
- }
+ long maxQueueOffsetInFile = getCQ().getMaxMsgOffsetFromFile(mf.getFile().getName());
+ if (maxQueueOffsetInFile > positionMgr.getOffset(topic, queueId)) {
+ newFiles.add(mf);
}
}
- return Collections.emptyList();
+ if (newFiles.isEmpty()) {
+ return null;
+ }
+
+ return new FileList(toCompactFiles, newFiles);
+ }
+
+ void compactAndReplace(FileList compactFiles) throws Throwable {
+ if (compactFiles == null || compactFiles.isEmpty()) {
+ return;
+ }
+
+ long startTime = System.nanoTime();
+ OffsetMap offsetMap = getOffsetMap(compactFiles.newFiles);
+ compaction(compactFiles.toCompactFiles, offsetMap);
+ replaceFiles(compactFiles.toCompactFiles, current, compacting);
+ positionMgr.setOffset(topic, queueId, offsetMap.lastOffset);
+ positionMgr.persist();
+ compacting.clean(false, false);
+ log.info("this compaction elapsed {} milliseconds",
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
+
}
void doCompaction() {
- if (!compacting.compareAndSet(false, true)) {
- log.warn("compaction is running, skip this time.");
+ if (!state.compareAndSet(State.NORMAL, State.COMPACTING)) {
+ log.warn("compactionLog state is {}, skip this time", state.get());
return;
}
try {
- List<MappedFile> mappedFileList = getCompactionFile();
- if (CollectionUtils.isNotEmpty(mappedFileList)) {
- long startTime = System.nanoTime();
- OffsetMap offsetMap = getOffsetMap(mappedFileList);
- compaction(mappedFileList, offsetMap);
- replaceFiles(mappedFileList, currentMappedFileQueue, newMappedFileQueue, offsetMap);
- positionMgr.setOffset(topic, queueId, offsetMap.lastOffset);
- log.info("this compaction elapsed {} milliseconds",
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
- }
+ compactAndReplace(getCompactionFile());
} catch (Throwable e) {
log.error("do compaction exception: ", e);
}
- compacting.set(false);
+ state.compareAndSet(State.COMPACTING, State.NORMAL);
}
protected OffsetMap getOffsetMap(List<MappedFile> mappedFileList) throws NoSuchAlgorithmException, DigestException {
@@ -535,7 +623,9 @@ public class CompactionLog {
MessageExt msg = MessageDecoder.decode(smb.getByteBuffer(), true, false);
if (msg != null) {
////get key & offset and put to offsetMap
- offsetMap.put(msg.getKeys(), msg.getQueueOffset());
+ if (msg.getQueueOffset() > positionMgr.getOffset(topic, queueId)) {
+ offsetMap.put(msg.getKeys(), msg.getQueueOffset());
+ }
} else {
// msg is null indicate that file is end
break;
@@ -561,10 +651,7 @@ public class CompactionLog {
}
protected void compaction(List<MappedFile> mappedFileList, OffsetMap offsetMap) throws DigestException {
- newMappedFileQueue = new MappedFileQueue(compactionLogFilePath + File.separator + SUB_FOLDER,
- compactionLogMappedFileSize, null);
- compactionBcq = new SparseConsumeQueue(topic, queueId, compactionCqFilePath,
- compactionCqMappedFileSize, defaultMessageStore, SUB_FOLDER);
+ compacting = new TopicPartitionLog(this, COMPACTING_SUB_FOLDER);
for (MappedFile mappedFile : mappedFileList) {
Iterator<SelectMappedBufferResult> iterator = mappedFile.iterator(0);
@@ -577,7 +664,7 @@ public class CompactionLog {
// file end
break;
} else {
- checkAndPutMessage(smb, msgExt, offsetMap, newMappedFileQueue, compactionBcq);
+ checkAndPutMessage(smb, msgExt, offsetMap, compacting);
}
} finally {
if (smb != null) {
@@ -586,20 +673,28 @@ public class CompactionLog {
}
}
}
- putEndMessage(newMappedFileQueue);
+ putEndMessage(compacting.getLog());
}
- private void replaceFiles(List<MappedFile> mappedFileList, MappedFileQueue current,
- MappedFileQueue compacted, OffsetMap offsetMap) {
+ protected void replaceFiles(List<MappedFile> mappedFileList, TopicPartitionLog current,
+ TopicPartitionLog newLog) {
+
+ MappedFileQueue dest = current.getLog();
+ MappedFileQueue src = newLog.getLog();
long beginTime = System.nanoTime();
- List<String> fileNameToReplace = mappedFileList.stream()
- .map(m -> m.getFile().getName())
+// List<String> fileNameToReplace = mappedFileList.stream()
+// .map(m -> m.getFile().getName())
+// .collect(Collectors.toList());
+
+ List<String> fileNameToReplace = dest.getMappedFiles().stream()
+ .filter(mappedFileList::contains)
+ .map(mf -> mf.getFile().getName())
.collect(Collectors.toList());
mappedFileList.forEach(MappedFile::renameToDelete);
- compacted.getMappedFiles().forEach(mappedFile -> {
+ src.getMappedFiles().forEach(mappedFile -> {
try {
mappedFile.moveToParent();
} catch (IOException e) {
@@ -607,27 +702,28 @@ public class CompactionLog {
}
});
- current.getMappedFiles().stream()
+ dest.getMappedFiles().stream()
.filter(m -> !mappedFileList.contains(m))
- .forEach(m -> compacted.getMappedFiles().add(m));
+ .forEach(m -> src.getMappedFiles().add(m));
readMessageLock.lock();
try {
mappedFileList.forEach(mappedFile -> mappedFile.destroy(1000));
- current.getMappedFiles().clear();
- current.getMappedFiles().addAll(compacted.getMappedFiles());
+ dest.getMappedFiles().clear();
+ dest.getMappedFiles().addAll(src.getMappedFiles());
+ src.getMappedFiles().clear();
- replaceCqFiles(currentBcq, compactionBcq, fileNameToReplace);
+ replaceCqFiles(getCQ(), newLog.getCQ(), fileNameToReplace);
- log.info("replace file elapsed {} millisecs",
+ log.info("replace file elapsed {} milliseconds",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime));
} finally {
readMessageLock.unlock();
}
}
- private void replaceCqFiles(SparseConsumeQueue currentBcq, SparseConsumeQueue compactionBcq,
+ protected void replaceCqFiles(SparseConsumeQueue currentBcq, SparseConsumeQueue compactionBcq,
List<String> fileNameToReplace) {
long beginTime = System.nanoTime();
@@ -653,21 +749,27 @@ public class CompactionLog {
currentMq.getMappedFiles().clear();
currentMq.getMappedFiles().addAll(compactMq.getMappedFiles());
+ compactMq.getMappedFiles().clear();
+
currentBcq.refresh();
log.info("replace consume queue file elapsed {} millsecs.",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime));
}
- public SparseConsumeQueue getCurrentBcq() {
- return currentBcq;
+ public MappedFileQueue getLog() {
+ return current.mappedFileQueue;
}
- public SparseConsumeQueue getCompactionBcq() {
- return compactionBcq;
+ public SparseConsumeQueue getCQ() {
+ return current.consumeQueue;
}
- public void flushCq(int flushLeastPages) {
- currentBcq.flush(flushLeastPages);
+// public SparseConsumeQueue getCompactionScq() {
+// return compactionScq;
+// }
+
+ public void flushCQ(int flushLeastPages) {
+ getCQ().flush(flushLeastPages);
}
static class CompactionAppendEndMsgCallback implements CompactionAppendMsgCallback {
@@ -831,4 +933,168 @@ public class CompactionLog {
}
}
+ static class TopicPartitionLog {
+ MappedFileQueue mappedFileQueue;
+ SparseConsumeQueue consumeQueue;
+
+ public TopicPartitionLog(CompactionLog compactionLog) {
+ this(compactionLog, null);
+ }
+ public TopicPartitionLog(CompactionLog compactionLog, String subFolder) {
+ if (StringUtils.isBlank(subFolder)) {
+ mappedFileQueue = new MappedFileQueue(compactionLog.compactionLogFilePath,
+ compactionLog.compactionLogMappedFileSize, null);
+ consumeQueue = new SparseConsumeQueue(compactionLog.topic, compactionLog.queueId,
+ compactionLog.compactionCqFilePath, compactionLog.compactionCqMappedFileSize,
+ compactionLog.defaultMessageStore);
+ } else {
+ mappedFileQueue = new MappedFileQueue(compactionLog.compactionLogFilePath + File.separator + subFolder,
+ compactionLog.compactionLogMappedFileSize, null);
+ consumeQueue = new SparseConsumeQueue(compactionLog.topic, compactionLog.queueId,
+ compactionLog.compactionCqFilePath, compactionLog.compactionCqMappedFileSize,
+ compactionLog.defaultMessageStore, subFolder);
+ }
+ }
+
+ public void shutdown() {
+ mappedFileQueue.shutdown(1000 * 30);
+ consumeQueue.getMappedFileQueue().shutdown(1000 * 30);
+ }
+
+ public void init(boolean exitOk) throws IOException, RuntimeException {
+ if (!mappedFileQueue.load()) {
+ shutdown();
+ throw new IOException("load log exception");
+ }
+
+ if (!consumeQueue.load()) {
+ shutdown();
+ throw new IOException("load consume queue exception");
+ }
+
+ try {
+ consumeQueue.recover();
+ recover();
+ sanityCheck();
+ } catch (Exception e) {
+ shutdown();
+ throw e;
+ }
+ }
+
+ private void recover() {
+ long maxCqPhysicOffset = consumeQueue.getMaxPhyOffsetInLog();
+ log.info("{}:{} max physical offset in compaction log is {}",
+ consumeQueue.getTopic(), consumeQueue.getQueueId(), maxCqPhysicOffset);
+ if (maxCqPhysicOffset > 0) {
+ this.mappedFileQueue.setFlushedWhere(maxCqPhysicOffset);
+ this.mappedFileQueue.setCommittedWhere(maxCqPhysicOffset);
+ this.mappedFileQueue.truncateDirtyFiles(maxCqPhysicOffset);
+ }
+ }
+
+ void sanityCheck() throws RuntimeException {
+ List<MappedFile> mappedFileList = mappedFileQueue.getMappedFiles();
+ for (MappedFile file : mappedFileList) {
+ if (!consumeQueue.containsOffsetFile(Long.parseLong(file.getFile().getName()))) {
+ throw new RuntimeException("log file mismatch with consumeQueue file " + file.getFileName());
+ }
+ }
+
+ List<MappedFile> cqMappedFileList = consumeQueue.getMappedFileQueue().getMappedFiles();
+ for (MappedFile file: cqMappedFileList) {
+ if (mappedFileList.stream().noneMatch(m -> Objects.equals(m.getFile().getName(), file.getFile().getName()))) {
+ throw new RuntimeException("consumeQueue file mismatch with log file " + file.getFileName());
+ }
+ }
+ }
+
+ public synchronized void roll() throws IOException {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+ if (mappedFile == null) {
+ throw new IOException("create new file error");
+ }
+ long baseOffset = mappedFile.getFileFromOffset();
+ MappedFile cqFile = consumeQueue.createFile(baseOffset);
+ if (cqFile == null) {
+ mappedFile.destroy(1000);
+ mappedFileQueue.getMappedFiles().remove(mappedFile);
+ throw new IOException("create new consumeQueue file error");
+ }
+ }
+
+ public synchronized void roll(int baseOffset) throws IOException {
+
+ MappedFile mappedFile = mappedFileQueue.tryCreateMappedFile(baseOffset);
+ if (mappedFile == null) {
+ throw new IOException("create new file error");
+ }
+
+ MappedFile cqFile = consumeQueue.createFile(baseOffset);
+ if (cqFile == null) {
+ mappedFile.destroy(1000);
+ mappedFileQueue.getMappedFiles().remove(mappedFile);
+ throw new IOException("create new consumeQueue file error");
+ }
+ }
+
+ public boolean isEmptyOrCurrentFileFull() {
+ return mappedFileQueue.isEmptyOrCurrentFileFull() ||
+ consumeQueue.getMappedFileQueue().isEmptyOrCurrentFileFull();
+ }
+
+ public void clean(MappedFileQueue mappedFileQueue) throws IOException {
+ for (MappedFile mf : mappedFileQueue.getMappedFiles()) {
+ if (mf.getFile().exists()) {
+ log.error("directory {} with {} not empty.", mappedFileQueue.getStorePath(), mf.getFileName());
+ throw new IOException("directory " + mappedFileQueue.getStorePath() + " not empty.");
+ }
+ }
+
+ mappedFileQueue.destroy();
+ }
+
+ public void clean(boolean forceCleanLog, boolean forceCleanCq) throws IOException {
+ //clean and delete sub_folder
+ if (forceCleanLog) {
+ mappedFileQueue.destroy();
+ } else {
+ clean(mappedFileQueue);
+ }
+
+ if (forceCleanCq) {
+ consumeQueue.getMappedFileQueue().destroy();
+ } else {
+ clean(consumeQueue.getMappedFileQueue());
+ }
+ }
+
+ public MappedFileQueue getLog() {
+ return mappedFileQueue;
+ }
+
+ public SparseConsumeQueue getCQ() {
+ return consumeQueue;
+ }
+ }
+
+ static enum State {
+ NORMAL,
+ INITIALIZING,
+ COMPACTING,
+ }
+
+ static class FileList {
+ List<MappedFile> newFiles;
+ List<MappedFile> toCompactFiles;
+ public FileList(List<MappedFile> toCompactFiles, List<MappedFile> newFiles) {
+ this.toCompactFiles = toCompactFiles;
+ this.newFiles = newFiles;
+ }
+
+ boolean isEmpty() {
+ return CollectionUtils.isEmpty(newFiles) || CollectionUtils.isEmpty(toCompactFiles);
+ }
+ }
+
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java
index c32bb1d3d..4181b34b8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java
@@ -49,6 +49,10 @@ public class CompactionPositionMgr extends ConfigManager {
return queueOffsetMap.getOrDefault(topic + "_" + queueId, -1L);
}
+ public boolean isEmpty() {
+ return queueOffsetMap.isEmpty();
+ }
+
public boolean isCompaction(String topic, int queueId, long offset) {
return getOffset(topic, queueId) > offset;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
index 891aa1f0f..0aff597eb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
@@ -107,18 +107,32 @@ public class CompactionService extends ServiceThread {
}
}
- @Override
- public void start() {
- compactionStore.load();
- super.start();
+ public boolean load(boolean exitOK) {
+ try {
+ compactionStore.load(exitOK);
+ return true;
+ } catch (Exception e) {
+ log.error("load compaction store error ", e);
+ return false;
+ }
}
+// @Override
+// public void start() {
+// compactionStore.load();
+// super.start();
+// }
+
@Override
public void shutdown() {
super.shutdown();
compactionStore.shutdown();
}
+ public void updateMasterAddress(String addr) {
+ compactionStore.updateMasterAddress(addr);
+ }
+
static class TopicPartitionOffset {
String topic;
int queueId;
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index 805ee6d93..9e69505e4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -51,6 +51,7 @@ public class CompactionStore {
private final int compactionInterval;
private final int compactionThreadNum;
private final int offsetMapSize;
+ private String masterAddr;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -75,7 +76,7 @@ public class CompactionStore {
this.compactionInterval = defaultMessageStore.getMessageStoreConfig().getCompactionScheduleInternal();
}
- public void load() {
+ public void load(boolean exitOk) throws Exception {
File logRoot = new File(compactionLogPath);
File[] fileTopicList = logRoot.listFiles();
if (fileTopicList != null) {
@@ -95,16 +96,18 @@ public class CompactionStore {
int queueId = Integer.parseInt(fileQueueId.getName());
if (Files.isDirectory(Paths.get(compactionCqPath, topic, String.valueOf(queueId)))) {
- CompactionLog log = new CompactionLog(defaultMessageStore, topic, queueId,
- offsetMapSize, positionMgr, compactionLogPath, compactionCqPath);
+ CompactionLog log = new CompactionLog(defaultMessageStore, this, topic, queueId);
+ log.load(exitOk);
compactionLogTable.put(topic + "_" + queueId, log);
- compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval, compactionInterval, TimeUnit.SECONDS);
+ compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
} else {
log.error("{}:{} compactionLog mismatch with compactionCq", topic, queueId);
}
} catch (Exception e) {
- log.warn("build bcq {}:{} exception: ",fileTopic.getName(), fileQueueId.getName(), e);
- continue;
+ log.error("load compactionLog {}:{} exception: ",
+ fileTopic.getName(), fileQueueId.getName(), e);
+ throw new Exception("load compactionLog " + fileTopic.getName()
+ + ":" + fileQueueId.getName() + " exception: " + e.getMessage());
}
}
}
@@ -114,21 +117,23 @@ public class CompactionStore {
}
public void putMessage(String topic, int queueId, SelectMappedBufferResult smr) throws Exception {
- compactionLogTable.compute(topic + "_" + queueId, (k, v) -> {
+ CompactionLog clog = compactionLogTable.compute(topic + "_" + queueId, (k, v) -> {
if (v == null) {
try {
- v = new CompactionLog(defaultMessageStore, topic, queueId, offsetMapSize, positionMgr,
- compactionLogPath, compactionCqPath);
- compactionSchedule.scheduleWithFixedDelay(v::doCompaction, compactionInterval, compactionInterval, TimeUnit.SECONDS);
+ v = new CompactionLog(defaultMessageStore,this, topic, queueId);
+ v.load(true);
+ compactionSchedule.scheduleWithFixedDelay(v::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
} catch (IOException e) {
log.error("create compactionLog exception: ", e);
return null;
}
}
-
- v.asyncPutMessage(smr);
return v;
});
+
+ if (clog != null) {
+ clog.asyncPutMessage(smr);
+ }
}
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
@@ -142,10 +147,12 @@ public class CompactionStore {
}
- public void flushCq(int flushLeastPages) {
- compactionLogTable.values().forEach(log -> {
- log.flushCq(flushLeastPages);
- });
+ public void flushCQ(int flushLeastPages) {
+ compactionLogTable.values().forEach(log -> log.flushCQ(flushLeastPages));
+ }
+
+ public void updateMasterAddress(String addr) {
+ this.masterAddr = addr;
}
public void shutdown() {
@@ -160,4 +167,29 @@ public class CompactionStore {
log.warn("wait compaction schedule shutdown interrupted. ");
}
}
+
+ public ScheduledExecutorService getCompactionSchedule() {
+ return compactionSchedule;
+ }
+
+ public String getCompactionLogPath() {
+ return compactionLogPath;
+ }
+
+ public String getCompactionCqPath() {
+ return compactionCqPath;
+ }
+
+ public CompactionPositionMgr getPositionMgr() {
+ return positionMgr;
+ }
+
+ public int getOffsetMapSize() {
+ return offsetMapSize;
+ }
+
+ public String getMasterAddr() {
+ return masterAddr;
+ }
+
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
new file mode 100644
index 000000000..5b6ac992e
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.kv;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Sets;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.io.IOException;
+import java.util.function.BiFunction;
+
+public class MessageFetcher implements AutoCloseable {
+
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private final RemotingClient client;
+ public MessageFetcher() {
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ nettyClientConfig.setUseTLS(false);
+ this.client = new NettyRemotingClient(nettyClientConfig);
+ this.client.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.client.shutdown();
+ }
+
+ private PullMessageRequestHeader createPullMessageRequest(String topic, int queueId, long queueOffset, long subVersion) {
+ int sysFlag = PullSysFlag.buildSysFlag(false, false, false, false, true);
+
+ PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+ requestHeader.setConsumerGroup(String.join("-", topic, String.valueOf(queueId), "pull", "group"));
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setQueueOffset(queueOffset);
+ requestHeader.setMaxMsgNums(10);
+ requestHeader.setSysFlag(sysFlag);
+ requestHeader.setCommitOffset(0L);
+ requestHeader.setSuspendTimeoutMillis(1000 * 20L);
+// requestHeader.setSubscription(subExpression);
+ requestHeader.setSubVersion(subVersion);
+ requestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
+// requestHeader.setExpressionType(expressionType);
+ return requestHeader;
+ }
+
+ private boolean prepare(String masterAddr, String topic, String groupName, long subVersion)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ HeartbeatData heartbeatData = new HeartbeatData();
+
+ heartbeatData.setClientID(String.join("@",
+ RemotingUtil.getLocalAddress(), "compactionIns", "compactionUnit"));
+
+ ConsumerData consumerData = new ConsumerData();
+ consumerData.setGroupName(groupName);
+ consumerData.setConsumeType(ConsumeType.CONSUME_ACTIVELY);
+ consumerData.setMessageModel(MessageModel.CLUSTERING);
+ consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+// consumerData.setSubscriptionDataSet();
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setTopic(topic);
+ subscriptionData.setSubString(SubscriptionData.SUB_ALL);
+ subscriptionData.setSubVersion(subVersion);
+ consumerData.setSubscriptionDataSet(Sets.newHashSet(subscriptionData));
+
+ heartbeatData.getConsumerDataSet().add(consumerData);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+ request.setLanguage(LanguageCode.JAVA);
+ request.setBody(heartbeatData.encode());
+
+ RemotingCommand response = client.invokeSync(masterAddr, request, 1000 * 30L);
+ if (response != null && response.getCode() == ResponseCode.SUCCESS) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean pullDone(String masterAddr, String groupName)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
+ requestHeader.setClientID(String.join("@",
+ RemotingUtil.getLocalAddress(), "compactionIns", "compactionUnit"));
+ requestHeader.setProducerGroup("");
+ requestHeader.setConsumerGroup(groupName);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
+
+ RemotingCommand response = client.invokeSync(masterAddr, request, 1000 * 30L);
+ if (response != null && response.getCode() == ResponseCode.SUCCESS) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean stopPull(long currPullOffset, long endOffset) {
+ return currPullOffset >= endOffset && endOffset != -1;
+ }
+
+ public void pullMessageFromMaster(String topic, int queueId, long endOffset, String masterAddr,
+ BiFunction<Long, RemotingCommand, Boolean> responseHandler) throws Exception {
+ long currentPullOffset = 0;
+
+ try {
+ long subVersion = System.currentTimeMillis();
+ String groupName = String.join("-", topic, String.valueOf(queueId), "pull", "group");
+ prepare(masterAddr, topic, groupName, subVersion);
+
+
+ boolean noNewMsg = false;
+ boolean keepPull = true;
+// PullMessageRequestHeader requestHeader = createPullMessageRequest(topic, queueId, subVersion, currentPullOffset);
+ while (!stopPull(currentPullOffset, endOffset)) {
+// requestHeader.setQueueOffset(currentPullOffset);
+ PullMessageRequestHeader requestHeader = createPullMessageRequest(topic, queueId, currentPullOffset, subVersion);
+
+ RemotingCommand
+ request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader);
+ RemotingCommand response = client.invokeSync(masterAddr, request, 1000 * 30L);
+
+ PullMessageResponseHeader responseHeader = response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+ if (responseHeader == null) {
+ log.error("{}:{} pull message responseHeader is null", topic, queueId);
+ throw new RemotingCommandException(topic + ":" + queueId + " pull message responseHeader is null");
+ }
+
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+ long curOffset = responseHeader.getNextBeginOffset() - 1;
+ keepPull = responseHandler.apply(curOffset, response);
+ currentPullOffset = responseHeader.getNextBeginOffset();
+ break;
+ case ResponseCode.PULL_NOT_FOUND: // NO_NEW_MSG, need break loop
+ log.info("PULL_NOT_FOUND, topic:{}, queueId:{}, pullOffset:{},",
+ topic, queueId, currentPullOffset);
+ noNewMsg = true;
+ break;
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ log.info("PULL_RETRY_IMMEDIATE, topic:{}, queueId:{}, pullOffset:{},",
+ topic, queueId, currentPullOffset);
+ break;
+ case ResponseCode.PULL_OFFSET_MOVED:
+ log.info("PULL_OFFSET_MOVED, topic:{}, queueId:{}, pullOffset:{},",
+ topic, queueId, currentPullOffset);
+ break;
+ default:
+ log.warn("Pull Message error, response code: {}, remark: {}",
+ response.getCode(), response.getRemark());
+ }
+
+ if (noNewMsg || !keepPull) {
+ break;
+ }
+ }
+ pullDone(masterAddr, groupName);
+ } finally {
+ if (client != null) {
+ client.closeChannels(Lists.newArrayList(masterAddr));
+ }
+ }
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
index fd2c37e2d..79b745d89 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
@@ -50,33 +51,62 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
@Override
public void recover() {
- MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
- if (lastMappedFile == null) {
- return;
- }
- ByteBuffer byteBuffer = lastMappedFile.sliceByteBuffer();
- int mappedFileOffset = 0;
- for (int i = 0; i < mappedFileSize; i += CQ_STORE_UNIT_SIZE) {
- byteBuffer.position(i);
- long offset = byteBuffer.getLong();
- int size = byteBuffer.getInt();
- byteBuffer.getLong(); //tagscode
- byteBuffer.getLong(); //timestamp
- long msgBaseOffset = byteBuffer.getLong();
- short batchSize = byteBuffer.getShort();
- if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
- mappedFileOffset += CQ_STORE_UNIT_SIZE;
- } else {
- log.info("Recover current batch consume queue file over, file:{} offset:{} size:{} msgBaseOffset:{} batchSize:{} mappedFileOffset:{}",
- lastMappedFile.getFileName(), offset, size, msgBaseOffset, batchSize, mappedFileOffset);
- break;
+ final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+ if (!mappedFiles.isEmpty()) {
+ int index = mappedFiles.size() - 3;
+ if (index < 0) {
+ index = 0;
}
- }
- lastMappedFile.setWrotePosition(mappedFileOffset);
- lastMappedFile.setFlushedPosition(mappedFileOffset);
- lastMappedFile.setCommittedPosition(mappedFileOffset);
- reviseMaxAndMinOffsetInQueue();
+ MappedFile mappedFile = mappedFiles.get(index);
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+ int mappedFileOffset = 0;
+ long processOffset = mappedFile.getFileFromOffset();
+ while (true) {
+ for (int i = 0; i < mappedFileSize; i += CQ_STORE_UNIT_SIZE) {
+ byteBuffer.position(i);
+ long offset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ byteBuffer.getLong(); //tagscode
+ byteBuffer.getLong(); //timestamp
+ long msgBaseOffset = byteBuffer.getLong();
+ short batchSize = byteBuffer.getShort();
+ if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
+ mappedFileOffset += CQ_STORE_UNIT_SIZE;
+ this.maxMsgPhyOffsetInCommitLog = offset;
+ } else {
+ log.info("Recover current batch consume queue file over, " + "file:{} offset:{} size:{} msgBaseOffset:{} batchSize:{} mappedFileOffset:{}",
+ mappedFile.getFileName(), offset, size, msgBaseOffset, batchSize, mappedFileOffset);
+
+ if (mappedFileOffset != mappedFileSize) {
+ mappedFile.setWrotePosition(mappedFileOffset);
+ mappedFile.setFlushedPosition(mappedFileOffset);
+ mappedFile.setCommittedPosition(mappedFileOffset);
+ }
+
+ break;
+ }
+ }
+
+ index++;
+ if (index >= mappedFiles.size()) {
+ log.info("Recover last batch consume queue file over, last mapped file:{} ", mappedFile.getFileName());
+ break;
+ } else {
+ mappedFile = mappedFiles.get(index);
+ byteBuffer = mappedFile.sliceByteBuffer();
+ processOffset = mappedFile.getFileFromOffset();
+ mappedFileOffset = 0;
+ log.info("Recover next batch consume queue file: " + mappedFile.getFileName());
+ }
+ }
+
+ processOffset += mappedFileOffset;
+ mappedFileQueue.setFlushedWhere(processOffset);
+ mappedFileQueue.setCommittedWhere(processOffset);
+ mappedFileQueue.truncateDirtyFiles(processOffset);
+ reviseMaxAndMinOffsetInQueue();
+ }
}
public ReferredIterator<CqUnit> iterateFromOrNext(long startOffset) {
@@ -274,6 +304,39 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
.anyMatch(mf -> Objects.equals(mf.getFile().getName(), fileName));
}
+ public long getMaxPhyOffsetInLog() {
+ MappedFile lastMappedFile = mappedFileQueue.getLastMappedFile();
+ Long maxOffsetInLog = getMax(lastMappedFile, b -> b.getLong(0) + b.getInt(8));
+ if (maxOffsetInLog != null) {
+ return maxOffsetInLog;
+ } else {
+ return -1;
+ }
+ }
+
+ private <T> T getMax(MappedFile mappedFile, Function<ByteBuffer, T> function) {
+ if (mappedFile == null || mappedFile.getReadPosition() < CQ_STORE_UNIT_SIZE) {
+ return null;
+ }
+
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+ for (int i = mappedFile.getReadPosition() - CQ_STORE_UNIT_SIZE; i >= 0; i -= CQ_STORE_UNIT_SIZE) {
+ byteBuffer.position(i);
+ long offset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ long tagsCode = byteBuffer.getLong(); //tagscode
+ long timestamp = byteBuffer.getLong(); //timestamp
+ long msgBaseOffset = byteBuffer.getLong();
+ short batchSize = byteBuffer.getShort();
+ if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
+ byteBuffer.position(i); //reset position
+ return function.apply(byteBuffer);
+ }
+ }
+
+ return null;
+ }
+
@Override
protected BatchOffsetIndex getMaxMsgOffset(MappedFile mappedFile, boolean getBatchSize, boolean getStoreTime) {
if (mappedFile == null || mappedFile.getReadPosition() < CQ_STORE_UNIT_SIZE) {
diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
index 45f05c33b..9da8a03e3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.store.MessageExtEncoder;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -60,8 +61,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import static org.apache.rocketmq.store.kv.CompactionLog.COMPACTING_SUB_FOLDER;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -112,6 +116,7 @@ public class CompactionLogTest {
defaultMessageStore = mock(DefaultMessageStore.class);
doReturn(storeConfig).when(defaultMessageStore).getMessageStoreConfig();
positionMgr = mock(CompactionPositionMgr.class);
+ doReturn(-1L).when(positionMgr).getOffset(topic, queueId);
}
static int queueOffset = 0;
@@ -122,7 +127,7 @@ public class CompactionLogTest {
msg.setTags(System.currentTimeMillis() + "TAG");
msg.setKeys(String.valueOf(queueOffset % keyCount));
msg.setBody(RandomStringUtils.randomAlphabetic(100).getBytes(StandardCharsets.UTF_8));
- msg.setQueueId(queueOffset % 8);
+ msg.setQueueId(0);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
@@ -144,15 +149,15 @@ public class CompactionLogTest {
MappedFileQueue smfq = mock(MappedFileQueue.class);
SparseConsumeQueue scq = mock(SparseConsumeQueue.class);
doReturn(smfq).when(scq).getMappedFileQueue();
- CompactionLog clog = mock(CompactionLog.class);
- FieldUtils.writeField(clog, "currentMappedFileQueue", mfq, true);
- FieldUtils.writeField(clog, "currentBcq", scq, true);
+ CompactionLog.TopicPartitionLog tpLog = mock(CompactionLog.TopicPartitionLog.class);
+ FieldUtils.writeField(tpLog, "mappedFileQueue", mfq, true);
+ FieldUtils.writeField(tpLog, "consumeQueue", scq, true);
doReturn(Lists.newArrayList()).when(mfq).getMappedFiles();
doReturn(Lists.newArrayList()).when(smfq).getMappedFiles();
- doCallRealMethod().when(clog).sanityCheck();
- clog.sanityCheck();
+ doCallRealMethod().when(tpLog).sanityCheck();
+ tpLog.sanityCheck();
}
@Test(expected = RuntimeException.class)
@@ -161,9 +166,9 @@ public class CompactionLogTest {
MappedFileQueue smfq = mock(MappedFileQueue.class);
SparseConsumeQueue scq = mock(SparseConsumeQueue.class);
doReturn(smfq).when(scq).getMappedFileQueue();
- CompactionLog clog = mock(CompactionLog.class);
- FieldUtils.writeField(clog, "currentMappedFileQueue", mfq, true);
- FieldUtils.writeField(clog, "currentBcq", scq, true);
+ CompactionLog.TopicPartitionLog tpLog = mock(CompactionLog.TopicPartitionLog.class);
+ FieldUtils.writeField(tpLog, "mappedFileQueue", mfq, true);
+ FieldUtils.writeField(tpLog, "consumeQueue", scq, true);
Files.createDirectories(Paths.get(logPath, topic, String.valueOf(queueId)));
Files.write(Paths.get(logPath, topic, String.valueOf(queueId), "102400"),
@@ -175,8 +180,8 @@ public class CompactionLogTest {
doReturn(Lists.newArrayList(mappedFile)).when(mfq).getMappedFiles();
doReturn(Lists.newArrayList()).when(smfq).getMappedFiles();
- doCallRealMethod().when(clog).sanityCheck();
- clog.sanityCheck();
+ doCallRealMethod().when(tpLog).sanityCheck();
+ tpLog.sanityCheck();
}
@Test
@@ -184,8 +189,8 @@ public class CompactionLogTest {
Iterator<SelectMappedBufferResult> iterator = mock(Iterator.class);
SelectMappedBufferResult smb = mock(SelectMappedBufferResult.class);
when(iterator.hasNext()).thenAnswer((Answer<Boolean>)invocationOnMock -> queueOffset < 1024);
- when(iterator.next()).thenReturn(smb);
- when(smb.getByteBuffer()).thenAnswer((Answer<ByteBuffer>)invocation -> buildMessage());
+ when(iterator.next()).thenAnswer((Answer<SelectMappedBufferResult>)invocation ->
+ new SelectMappedBufferResult(0, buildMessage(), 0, null));
MappedFile mf = mock(MappedFile.class);
List<MappedFile> mappedFileList = Lists.newArrayList(mf);
@@ -198,6 +203,7 @@ public class CompactionLogTest {
CompactionLog clog = mock(CompactionLog.class);
FieldUtils.writeField(clog, "defaultMessageStore", messageStore, true);
doCallRealMethod().when(clog).getOffsetMap(any());
+ FieldUtils.writeField(clog, "positionMgr", positionMgr, true);
queueOffset = 0;
CompactionLog.OffsetMap offsetMap = clog.getOffsetMap(mappedFileList);
@@ -205,12 +211,12 @@ public class CompactionLogTest {
doCallRealMethod().when(clog).compaction(any(List.class), any(CompactionLog.OffsetMap.class));
doNothing().when(clog).putEndMessage(any(MappedFileQueue.class));
- doCallRealMethod().when(clog).checkAndPutMessage(any(SelectMappedBufferResult.class), any(MessageExt.class), any(
- CompactionLog.OffsetMap.class), any(MappedFileQueue.class), any(SparseConsumeQueue.class));
+ doCallRealMethod().when(clog).checkAndPutMessage(any(SelectMappedBufferResult.class),
+ any(MessageExt.class), any(CompactionLog.OffsetMap.class), any(CompactionLog.TopicPartitionLog.class));
doCallRealMethod().when(clog).shouldRetainMsg(any(MessageExt.class), any(CompactionLog.OffsetMap.class));
List<MessageExt> compactResult = Lists.newArrayList();
- when(clog.asyncPutMessage(any(SelectMappedBufferResult.class), any(MessageExt.class),
- any(MappedFileQueue.class), any(SparseConsumeQueue.class)))
+ when(clog.asyncPutMessage(any(ByteBuffer.class), any(MessageExt.class),
+ any(CompactionLog.TopicPartitionLog.class)))
.thenAnswer((Answer<CompletableFuture< PutMessageResult >>)invocation -> {
compactResult.add(invocation.getArgument(1));
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK,
@@ -223,4 +229,36 @@ public class CompactionLogTest {
assertEquals(1023, compactResult.stream().mapToLong(MessageExt::getQueueOffset).max().orElse(0));
}
+ @Test
+ public void testReplaceFiles() throws IOException, IllegalAccessException {
+ CompactionLog clog = mock(CompactionLog.class);
+ doCallRealMethod().when(clog).replaceFiles(anyList(), any(CompactionLog.TopicPartitionLog.class),
+ any(CompactionLog.TopicPartitionLog.class));
+ doCallRealMethod().when(clog).replaceCqFiles(any(SparseConsumeQueue.class),
+ any(SparseConsumeQueue.class), anyList());
+
+ CompactionLog.TopicPartitionLog dest = mock(CompactionLog.TopicPartitionLog.class);
+ MappedFileQueue destMFQ = mock(MappedFileQueue.class);
+ when(dest.getLog()).thenReturn(destMFQ);
+ List<MappedFile> destFiles = Lists.newArrayList();
+ when(destMFQ.getMappedFiles()).thenReturn(destFiles);
+
+ List<MappedFile> srcFiles = Lists.newArrayList();
+ String fileName = logPath + File.separator + COMPACTING_SUB_FOLDER + File.separator + String.format("%010d", 0);
+ MappedFile mf = new DefaultMappedFile(fileName, 1024);
+ srcFiles.add(mf);
+ MappedFileQueue srcMFQ = mock(MappedFileQueue.class);
+ when(srcMFQ.getMappedFiles()).thenReturn(srcFiles);
+ CompactionLog.TopicPartitionLog src = mock(CompactionLog.TopicPartitionLog.class);
+ when(src.getLog()).thenReturn(srcMFQ);
+
+ FieldUtils.writeField(clog, "readMessageLock", new PutMessageSpinLock(), true);
+
+ clog.replaceFiles(Lists.newArrayList(), dest, src);
+ assertEquals(destFiles.size(), 1);
+ destFiles.forEach(f -> {
+ assertFalse(f.getFileName().contains(COMPACTING_SUB_FOLDER));
+ });
+ }
+
}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 9c1af641b..a4544544b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -64,6 +64,7 @@ import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
+import org.apache.rocketmq.tools.command.message.DumpCompactionLogCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
@@ -247,6 +248,7 @@ public class MQAdminStartup {
initCommand(new ExportMetricsCommand());
initCommand(new HAStatusSubCommand());
+ initCommand(new DumpCompactionLogCommand());
}
private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
new file mode 100644
index 000000000..b1c8c33cb
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class DumpCompactionLogCommand implements SubCommand {
+ @Override
+ public String commandDesc() {
+ return "parse compaction log to message";
+ }
+
+ @Override
+ public String commandName() {
+ return "dumpCompactionLog";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("f", "file", true, "to dump file name");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+ throws SubCommandException {
+ if (commandLine.hasOption("f")) {
+ String fileName = commandLine.getOptionValue("f");
+ Path filePath = Paths.get(fileName);
+ if (!Files.exists(filePath)) {
+ throw new SubCommandException("file " + fileName + " not exist.");
+ }
+
+ if (Files.isDirectory(filePath)) {
+ throw new SubCommandException("file " + fileName + " is a directory.");
+ }
+
+ try {
+ long fileSize = Files.size(filePath);
+ FileChannel fileChannel = new RandomAccessFile(fileName, "rw").getChannel();
+ ByteBuffer buf = fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+
+ int current = 0;
+ while (current < fileSize) {
+ buf.position(current);
+ ByteBuffer bb = buf.slice();
+ int size = bb.getInt();
+ if (size > buf.capacity() || size < 0) {
+ break;
+ } else {
+ bb.limit(size);
+ bb.rewind();
+ }
+
+ MessageExt messageExt = MessageDecoder.decode(bb, false, false);
+ if (messageExt == null) {
+ break;
+ } else {
+ current += size;
+ System.out.printf(messageExt + "\n");
+ }
+ }
+
+ UtilAll.cleanBuffer(buf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ } else {
+ System.out.print("miss dump log file name\n");
+ }
+
+
+ }
+}