You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/05/04 01:39:58 UTC
[rocketmq] branch develop updated: [ISSUE #6609] Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover (#6618)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b410557e8 [ISSUE #6609] Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover (#6618)
b410557e8 is described below
commit b410557e873ee4bdfa56561eb719a26007c1a3bb
Author: rongtong <ji...@163.com>
AuthorDate: Thu May 4 09:39:51 2023 +0800
[ISSUE #6609] Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover (#6618)
* Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover
---
.../apache/rocketmq/broker/BrokerController.java | 9 ++--
.../broker/controller/ReplicasManager.java | 8 ++--
.../controller/ReplicasManagerRegisterTest.java | 5 +-
.../broker/controller/ReplicasManagerTest.java | 4 ++
.../java/org/apache/rocketmq/store/CommitLog.java | 55 ++++++++++++++++------
.../apache/rocketmq/store/DefaultMessageStore.java | 9 ++--
.../org/apache/rocketmq/store/RunningFlags.java | 18 +++++--
.../org/apache/rocketmq/store/StoreCheckpoint.java | 12 +++++
.../store/ha/autoswitch/AutoSwitchHAClient.java | 3 +-
.../ha/autoswitch/AutoSwitchHAConnection.java | 2 +-
.../store/ha/autoswitch/AutoSwitchHAService.java | 35 +++-----------
.../store/ha/autoswitch/AutoSwitchHATest.java | 32 +++++++++++--
12 files changed, 124 insertions(+), 68 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a35618dc0..68c9d963b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -763,6 +763,11 @@ public class BrokerController {
LOG.error("BrokerController#initialize: unexpected error occurs", e);
}
}
+
+ if (this.brokerConfig.isEnableControllerMode()) {
+ this.replicasManager.setIsolatedAndBrokerPermission(false);
+ }
+
if (messageStore != null) {
registerMessageStoreHook();
result = result && this.messageStore.load();
@@ -1556,10 +1561,6 @@ public class BrokerController {
isIsolated = true;
}
- if (this.brokerConfig.isEnableControllerMode()) {
- this.replicasManager.setIsolatedAndBrokerPermission(false);
- }
-
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 3c7e061a2..005d6b3cb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -122,10 +122,6 @@ public class ReplicasManager {
this.tempBrokerMetadata = new TempBrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathBrokerIdentity() + "-temp");
}
- public long getConfirmOffset() {
- return this.haService.getConfirmOffset();
- }
-
enum State {
INITIAL,
FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
@@ -419,7 +415,7 @@ public class ReplicasManager {
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer(), this.getLastEpoch(),
this.brokerController.getMessageStore().getMaxPhyOffset(),
- this.getConfirmOffset(),
+ this.brokerController.getMessageStore().getConfirmOffset(),
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
this.brokerConfig.getBrokerElectionPriority()
);
@@ -881,11 +877,13 @@ public class ReplicasManager {
if (isBrokerRoleConfirmed) {
this.brokerController.setIsolated(false);
this.brokerConfig.setBrokerPermission(this.originalBrokerPermission);
+ this.brokerController.getMessageStore().getRunningFlags().makeIsolated(false);
} else {
// prohibit writing and reading before confirming the broker role
this.brokerController.setIsolated(true);
this.originalBrokerPermission = this.brokerConfig.getBrokerPermission();
this.brokerConfig.setBrokerPermission(0);
+ this.brokerController.getMessageStore().getRunningFlags().makeIsolated(true);
}
}
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
index 7fb9d9aeb..d01a6f76f 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBro
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata;
@@ -103,6 +104,8 @@ public class ReplicasManagerRegisterTest {
private AutoSwitchHAService mockedAutoSwitchHAService;
+ private RunningFlags runningFlags = new RunningFlags();
+
@Before
public void setUp() throws Exception {
UtilAll.deleteFile(new File(STORE_BASE_PATH));
@@ -116,8 +119,8 @@ public class ReplicasManagerRegisterTest {
when(mockedBrokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
when(mockedBrokerController.getTopicConfigManager()).thenReturn(mockedTopicConfigManager);
when(mockedMessageStore.getHaService()).thenReturn(mockedAutoSwitchHAService);
+ when(mockedMessageStore.getRunningFlags()).thenReturn(runningFlags);
when(mockedBrokerController.getSlaveSynchronize()).thenReturn(new SlaveSynchronize(mockedBrokerController));
-
when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn(
new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR));
when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index e03828cff..c863f7ac9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextB
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.assertj.core.api.Assertions;
@@ -95,6 +96,8 @@ public class ReplicasManagerTest {
private SyncStateSet syncStateSet;
+ private RunningFlags runningFlags = new RunningFlags();
+
private static final String OLD_MASTER_ADDRESS = "192.168.1.1";
private static final String NEW_MASTER_ADDRESS = "192.168.1.2";
@@ -150,6 +153,7 @@ public class ReplicasManagerTest {
when(defaultMessageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
when(brokerController.getMessageStore().getHaService()).thenReturn(autoSwitchHAService);
+ when(brokerController.getMessageStore().getRunningFlags()).thenReturn(runningFlags);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
when(brokerController.getSlaveSynchronize()).thenReturn(slaveSynchronize);
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 75b4042dc..18cc32179 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
/**
@@ -337,10 +338,19 @@ public class CommitLog implements Swappable {
}
processOffset += mappedFileOffset;
- // Set a candidate confirm offset.
- // In most cases, this value will be overwritten by confirmLog.init.
- // It works if some confirmed messages are lost.
- this.setConfirmOffset(lastValidMsgPhyOffset);
+
+ if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+ if (this.defaultMessageStore.getConfirmOffset() < this.defaultMessageStore.getMinPhyOffset()) {
+ log.error("confirmOffset {} is less than minPhyOffset {}, correct confirmOffset to minPhyOffset", this.defaultMessageStore.getConfirmOffset(), this.defaultMessageStore.getMinPhyOffset());
+ this.defaultMessageStore.setConfirmOffset(this.defaultMessageStore.getMinPhyOffset());
+ } else if (this.defaultMessageStore.getConfirmOffset() > processOffset) {
+ log.error("confirmOffset {} is larger than lastValidMsgPhyOffset {}, correct confirmOffset to processOffset", this.defaultMessageStore.getConfirmOffset(), processOffset);
+ this.defaultMessageStore.setConfirmOffset(processOffset);
+ }
+ } else {
+ this.setConfirmOffset(lastValidMsgPhyOffset);
+ }
+
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
@@ -544,7 +554,18 @@ public class CommitLog implements Swappable {
}
public long getConfirmOffset() {
- if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+ if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+ if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isIsolated()) {
+ if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
+ return this.defaultMessageStore.getMaxPhyOffset();
+ }
+ // First time compute confirmOffset.
+ if (this.confirmOffset <= 0) {
+ setConfirmOffset(((AutoSwitchHAService) this.defaultMessageStore.getHaService()).computeConfirmOffset());
+ }
+ }
+ return this.confirmOffset;
+ } else if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
return this.confirmOffset;
} else {
return getMaxOffset();
@@ -553,6 +574,7 @@ public class CommitLog implements Swappable {
public void setConfirmOffset(long phyOffset) {
this.confirmOffset = phyOffset;
+ this.defaultMessageStore.getStoreCheckpoint().setConfirmPhyOffset(confirmOffset);
}
public long getLastFileFromOffset() {
@@ -605,8 +627,8 @@ public class CommitLog implements Swappable {
lastValidMsgPhyOffset = processOffset + mappedFileOffset;
mappedFileOffset += size;
- if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
- if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+ if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() || this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+ if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getCommitLog().getConfirmOffset()) {
this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
}
} else {
@@ -644,10 +666,17 @@ public class CommitLog implements Swappable {
}
processOffset += mappedFileOffset;
- // Set a candidate confirm offset.
- // In most cases, this value will be overwritten by confirmLog.init.
- // It works if some confirmed messages are lost.
- this.setConfirmOffset(lastValidMsgPhyOffset);
+ if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+ if (this.defaultMessageStore.getConfirmOffset() < this.defaultMessageStore.getMinPhyOffset()) {
+ log.error("confirmOffset {} is less than minPhyOffset {}, correct confirmOffset to minPhyOffset", this.defaultMessageStore.getConfirmOffset(), this.defaultMessageStore.getMinPhyOffset());
+ this.defaultMessageStore.setConfirmOffset(this.defaultMessageStore.getMinPhyOffset());
+ } else if (this.defaultMessageStore.getConfirmOffset() > processOffset) {
+ log.error("confirmOffset {} is larger than lastValidMsgPhyOffset {}, correct confirmOffset to processOffset", this.defaultMessageStore.getConfirmOffset(), processOffset);
+ this.defaultMessageStore.setConfirmOffset(processOffset);
+ }
+ } else {
+ this.setConfirmOffset(lastValidMsgPhyOffset);
+ }
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
@@ -744,7 +773,7 @@ public class CommitLog implements Swappable {
// dynamically adjust maxMessageSize, but not support DLedger mode temporarily
int newMaxMessageSize = this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize();
if (newMaxMessageSize >= 10 &&
- putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
+ putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
}
}
@@ -956,7 +985,6 @@ public class CommitLog implements Swappable {
int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
boolean needHandleHA = needHandleHA(messageExtBatch);
-
if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
@@ -1829,7 +1857,6 @@ public class CommitLog implements Swappable {
}
-
class DefaultFlushManager implements FlushManager {
private final FlushCommitLogService flushCommitLogService;
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index e1bdc6e71..ca8f30684 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -335,11 +335,14 @@ public class DefaultMessageStore implements MessageStore {
new StoreCheckpoint(
StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
+ setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
+
result = this.indexService.load(lastExitOK);
this.recover(lastExitOK);
LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
}
+
long maxOffset = this.getMaxPhyOffset();
this.setBrokerInitMaxOffset(maxOffset);
LOGGER.info("load over, and the max phy offset = {}", maxOffset);
@@ -1589,9 +1592,6 @@ public class DefaultMessageStore implements MessageStore {
@Override
public long getConfirmOffset() {
- if (this.brokerConfig.isEnableControllerMode()) {
- return ((AutoSwitchHAService) this.haService).getConfirmOffset();
- }
return this.commitLog.getConfirmOffset();
}
@@ -2747,9 +2747,6 @@ public class DefaultMessageStore implements MessageStore {
}
public boolean isCommitLogAvailable() {
- if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) {
- return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
- }
return this.reputFromOffset < DefaultMessageStore.this.getConfirmOffset();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 7ff11a282..6a0ef5a5f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -28,6 +28,8 @@ public class RunningFlags {
private static final int DISK_FULL_BIT = 1 << 4;
+ private static final int ISOLATED_BIT = 1 << 5;
+
private volatile int flagBits = 0;
public RunningFlags() {
@@ -46,11 +48,11 @@ public class RunningFlags {
}
public boolean isReadable() {
- if ((this.flagBits & NOT_READABLE_BIT) == 0) {
- return true;
- }
+ return (this.flagBits & NOT_READABLE_BIT) == 0;
+ }
- return false;
+ public boolean isIsolated() {
+ return (this.flagBits & ISOLATED_BIT) != 0;
}
public boolean getAndMakeNotReadable() {
@@ -98,6 +100,14 @@ public class RunningFlags {
this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
}
+ public void makeIsolated(boolean isolated) {
+ if (isolated) {
+ this.flagBits |= ISOLATED_BIT;
+ } else {
+ this.flagBits &= ~ISOLATED_BIT;
+ }
+ }
+
public boolean isLogicsQueueError() {
if ((this.flagBits & WRITE_LOGICS_QUEUE_ERROR_BIT) == WRITE_LOGICS_QUEUE_ERROR_BIT) {
return true;
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
index a06aa2853..1e2504a2b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
@@ -37,6 +37,7 @@ public class StoreCheckpoint {
private volatile long logicsMsgTimestamp = 0;
private volatile long indexMsgTimestamp = 0;
private volatile long masterFlushedOffset = 0;
+ private volatile long confirmPhyOffset = 0;
public StoreCheckpoint(final String scpPath) throws IOException {
File file = new File(scpPath);
@@ -53,6 +54,7 @@ public class StoreCheckpoint {
this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
this.masterFlushedOffset = this.mappedByteBuffer.getLong(24);
+ this.confirmPhyOffset = this.mappedByteBuffer.getLong(32);
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
@@ -61,6 +63,7 @@ public class StoreCheckpoint {
log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
log.info("store checkpoint file masterFlushedOffset " + this.masterFlushedOffset);
+ log.info("store checkpoint file confirmPhyOffset " + this.confirmPhyOffset);
} else {
log.info("store checkpoint file not exists, " + scpPath);
}
@@ -84,6 +87,7 @@ public class StoreCheckpoint {
this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
this.mappedByteBuffer.putLong(24, this.masterFlushedOffset);
+ this.mappedByteBuffer.putLong(32, this.confirmPhyOffset);
this.mappedByteBuffer.force();
}
@@ -103,6 +107,14 @@ public class StoreCheckpoint {
this.logicsMsgTimestamp = logicsMsgTimestamp;
}
+ public long getConfirmPhyOffset() {
+ return confirmPhyOffset;
+ }
+
+ public void setConfirmPhyOffset(long confirmPhyOffset) {
+ this.confirmPhyOffset = confirmPhyOffset;
+ }
+
public long getMinTimestampIndex() {
return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 2ef225e69..936db0c4c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -168,7 +168,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
this.processPosition = 0;
this.lastReadTimestamp = System.currentTimeMillis();
this.lastWriteTimestamp = System.currentTimeMillis();
- haService.updateConfirmOffset(-1);
}
public void reOpen() throws IOException {
@@ -565,7 +564,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
}
- haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
+ haService.getDefaultMessageStore().setConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
if (!reportSlaveMaxOffset(HAConnectionState.TRANSFER)) {
LOGGER.error("AutoSwitchHAClient report max offset to master failed");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 60710f143..440cd3c7a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -547,7 +547,7 @@ public class AutoSwitchHAConnection implements HAConnection {
// EpochStartOffset
this.byteBufferHeader.putLong(entry.getStartOffset());
// Additional info(confirm offset)
- final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+ final long confirmOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getConfirmOffset();
this.byteBufferHeader.putLong(confirmOffset);
this.byteBufferHeader.flip();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index d1e623ca7..75ef622ec 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -69,7 +69,6 @@ public class AutoSwitchHAService extends DefaultHAService {
// Indicate whether the syncStateSet is currently in the process of being synchronized to controller.
private volatile boolean isSynchronizingSyncStateSet = false;
- private volatile long confirmOffset = -1;
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
@@ -128,7 +127,7 @@ public class AutoSwitchHAService extends DefaultHAService {
// Truncate dirty file
final long truncateOffset = truncateInvalidMsg();
- updateConfirmOffset(computeConfirmOffset());
+ this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
if (truncateOffset >= 0) {
this.epochCache.truncateSuffixByOffset(truncateOffset);
@@ -155,7 +154,7 @@ public class AutoSwitchHAService extends DefaultHAService {
defaultMessageStore.getTransientStorePool().setRealCommit(true);
}
- LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
+ LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, this.defaultMessageStore.getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
this.defaultMessageStore.recoverTopicQueueTable();
this.defaultMessageStore.setStateMachineVersion(masterEpoch);
LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
@@ -309,7 +308,7 @@ public class AutoSwitchHAService extends DefaultHAService {
if (currentSyncStateSet.contains(slaveBrokerId)) {
return;
}
- final long confirmOffset = getConfirmOffset();
+ final long confirmOffset = this.defaultMessageStore.getConfirmOffset();
if (slaveMaxOffset >= confirmOffset) {
final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
@@ -346,27 +345,11 @@ public class AutoSwitchHAService extends DefaultHAService {
this.connectionCaughtUpTimeTable.put(slaveBrokerId, Math.max(prevTime, lastCaughtUpTimeMs));
}
- /**
- * Get confirm offset (min slaveAckOffset of all syncStateSet members) for master
- */
- public long getConfirmOffset() {
- if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
- if (getLocalSyncStateSet().size() == 1) {
- return this.defaultMessageStore.getMaxPhyOffset();
- }
- // First time compute confirmOffset.
- if (this.confirmOffset <= 0) {
- this.confirmOffset = computeConfirmOffset();
- }
- }
- return confirmOffset;
- }
-
public void updateConfirmOffsetWhenSlaveAck(final Long slaveBrokerId) {
this.readLock.lock();
try {
if (this.syncStateSet.contains(slaveBrokerId)) {
- this.confirmOffset = computeConfirmOffset();
+ this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
}
} finally {
this.readLock.unlock();
@@ -425,11 +408,7 @@ public class AutoSwitchHAService extends DefaultHAService {
return info;
}
- public void updateConfirmOffset(long confirmOffset) {
- this.confirmOffset = confirmOffset;
- }
-
- private long computeConfirmOffset() {
+ public long computeConfirmOffset() {
final Set<Long> currentSyncStateSet = getSyncStateSet();
long newConfirmOffset = this.defaultMessageStore.getMaxPhyOffset();
List<Long> idList = this.connectionList.stream().map(connection -> ((AutoSwitchHAConnection)connection).getSlaveId()).collect(Collectors.toList());
@@ -439,7 +418,7 @@ public class AutoSwitchHAService extends DefaultHAService {
for (Long syncId : currentSyncStateSet) {
if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
- return this.confirmOffset;
+ return this.defaultMessageStore.getConfirmOffset();
}
}
@@ -458,7 +437,7 @@ public class AutoSwitchHAService extends DefaultHAService {
markSynchronizingSyncStateSetDone();
this.syncStateSet.clear();
this.syncStateSet.addAll(syncStateSet);
- this.confirmOffset = computeConfirmOffset();
+ this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
} finally {
this.writeLock.unlock();
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 6d105289f..19b4c8eb7 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.StoreCheckpoint;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -214,7 +215,7 @@ public class AutoSwitchHATest {
assertTrue(masterAndPutMessage);
checkMessage(this.messageStore2, 10, 0);
- final long confirmOffset = ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset();
+ final long confirmOffset = this.messageStore1.getConfirmOffset();
// Step2, shutdown store2
this.messageStore2.shutdown();
@@ -224,16 +225,18 @@ public class AutoSwitchHATest {
assertEquals(putMessageResult.getPutMessageStatus(), PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
// The confirmOffset still don't change, because syncStateSet contains broker2, but broker2 shutdown
- assertEquals(confirmOffset, ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset());
+ assertEquals(confirmOffset, this.messageStore1.getConfirmOffset());
// Step3, shutdown store1, start store2, change store2 to master, epoch = 2
this.messageStore1.shutdown();
storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER);
messageStore2 = buildMessageStore(storeConfig2, 2L);
+ messageStore2.getRunningFlags().makeIsolated(true);
assertTrue(messageStore2.load());
messageStore2.start();
messageStore2.getHaService().changeToMaster(2);
+ messageStore2.getRunningFlags().makeIsolated(false);
((AutoSwitchHAService) messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(2L)));
// Put message on master
@@ -461,7 +464,7 @@ public class AutoSwitchHATest {
// Step2: check flag SynchronizingSyncStateSet
Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
- Assert.assertEquals(masterHAService.getConfirmOffset(), 1570);
+ Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570);
Set<Long> syncStateSet = masterHAService.getSyncStateSet();
Assert.assertEquals(syncStateSet.size(), 2);
Assert.assertTrue(syncStateSet.contains(1L));
@@ -475,6 +478,29 @@ public class AutoSwitchHATest {
Assert.assertFalse(masterHAService.isSynchronizingSyncStateSet());
}
+ @Test
+ public void testBuildConsumeQueueNotExceedConfirmOffset() throws Exception {
+ init(defaultMappedFileSize);
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L)));
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+
+ long tmpConfirmOffset = this.messageStore2.getConfirmOffset();
+ long setConfirmOffset = this.messageStore2.getConfirmOffset() - this.messageStore2.getConfirmOffset() / 2;
+ messageStore2.shutdown();
+ StoreCheckpoint storeCheckpoint = new StoreCheckpoint(storeConfig2.getStorePathRootDir() + File.separator + "checkpoint");
+ assertEquals(tmpConfirmOffset, storeCheckpoint.getConfirmPhyOffset());
+ storeCheckpoint.setConfirmPhyOffset(setConfirmOffset);
+ storeCheckpoint.shutdown();
+ messageStore2 = buildMessageStore(storeConfig2, 2L);
+ messageStore2.getRunningFlags().makeIsolated(true);
+ assertTrue(messageStore2.load());
+ messageStore2.start();
+ messageStore2.getRunningFlags().makeIsolated(false);
+ assertEquals(setConfirmOffset, messageStore2.getConfirmOffset());
+ checkMessage(this.messageStore2, 5, 0);
+ }
+
@After
public void destroy() throws Exception {
if (this.messageStore2 != null) {