You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/15 07:53:28 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: Update the logic of confirmOffset and recover topicQueueTable to prevent message loss when broker role change (#4460)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new d9273e70f Update the logic of confirmOffset and recover topicQueueTable to prevent message loss when broker role change (#4460)
d9273e70f is described below
commit d9273e70f649af7a13d8e8443e8059c4ac1c530d
Author: rongtong <ji...@163.com>
AuthorDate: Wed Jun 15 15:53:10 2022 +0800
Update the logic of confirmOffset and recover topicQueueTable to prevent message loss when broker role change (#4460)
* Update the logic of confirmOffset and recover topicQueueTable to prevent message loss
* Do not dispatch when reputFromOffset + size is greater than confirmOffset to prevent message loss
* Pass the UT
* Pass the UT
---
.../broker/hacontroller/ReplicasManager.java | 6 ++--
.../controller/impl/DLedgerController.java | 3 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 25 ++++++++--------
.../store/ha/autoswitch/AutoSwitchHAClient.java | 14 ++-------
.../store/ha/autoswitch/AutoSwitchHAService.java | 35 +++++++++++++++-------
.../store/ha/autoswitch/AutoSwitchHATest.java | 4 +++
.../test/autoswitchrole/AutoSwitchRoleBase.java | 1 +
.../AutoSwitchRoleIntegrationTest.java | 7 ++++-
8 files changed, 55 insertions(+), 40 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 65b0d3fc3..c11129ca7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -147,7 +147,8 @@ public class ReplicasManager {
this.scheduledService.shutdown();
}
- public synchronized void changeBrokerRole(final String newMasterAddress, final int newMasterEpoch, final int syncStateSetEpoch, final long brokerId) {
+ public synchronized void changeBrokerRole(final String newMasterAddress, final int newMasterEpoch,
+ final int syncStateSetEpoch, final long brokerId) {
if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > this.masterEpoch) {
if (StringUtils.equals(newMasterAddress, this.localAddress)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch);
@@ -170,7 +171,6 @@ public class ReplicasManager {
final HashSet<String> newSyncStateSet = new HashSet<>();
newSyncStateSet.add(this.localAddress);
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
- schedulingCheckSyncStateSet();
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
@@ -182,6 +182,8 @@ public class ReplicasManager {
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
this.brokerController.changeSpecialServiceStatus(true);
+ schedulingCheckSyncStateSet();
+
this.executorService.submit(() -> {
// Register broker to name-srv
try {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 67f293ad1..0027eea36 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -424,9 +424,8 @@ public class DLedgerController implements Controller {
} catch (final Throwable e) {
log.error("Error happen when controller leader append initial request to dledger", e);
tryTimes++;
- if (tryTimes > 2) {
+ if (tryTimes % 3 == 0) {
log.warn("Controller leader append initial log failed too many times, please wait a while");
- tryTimes = 0;
}
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 8dc2e9a3a..5b5173d72 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -616,6 +616,8 @@ public class DefaultMessageStore implements MessageStore {
this.reputMessageService.shutdown();
+ long oldReputFromOffset = this.reputMessageService.getReputFromOffset();
+
// truncate commitLog
this.commitLog.truncateDirtyFiles(offsetToTruncate);
@@ -625,7 +627,7 @@ public class DefaultMessageStore implements MessageStore {
recoverTopicQueueTable();
this.reputMessageService = new ReputMessageService();
- this.reputMessageService.setReputFromOffset(offsetToTruncate);
+ this.reputMessageService.setReputFromOffset(Math.min(oldReputFromOffset, offsetToTruncate));
this.reputMessageService.start();
}
@@ -1394,10 +1396,7 @@ public class DefaultMessageStore implements MessageStore {
@Override
public long getConfirmOffset() {
if (this.brokerConfig.isEnableControllerMode()) {
- long confirmOffset = ((AutoSwitchHAService)this.haService).getConfirmOffset();
- if (confirmOffset > 0) {
- return confirmOffset;
- }
+ return ((AutoSwitchHAService) this.haService).getConfirmOffset();
}
return this.commitLog.getConfirmOffset();
}
@@ -2407,7 +2406,7 @@ public class DefaultMessageStore implements MessageStore {
}
public long behind() {
- return DefaultMessageStore.this.commitLog.getConfirmOffset() - this.reputFromOffset;
+ return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset;
}
private boolean isCommitLogAvailable() {
@@ -2415,7 +2414,7 @@ public class DefaultMessageStore implements MessageStore {
return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
}
if (DefaultMessageStore.this.getBrokerConfig().isEnableControllerMode()) {
- return this.reputFromOffset <= ((AutoSwitchHAService)DefaultMessageStore.this.haService).getConfirmOffset();
+ return this.reputFromOffset < ((AutoSwitchHAService) DefaultMessageStore.this.haService).getConfirmOffset();
}
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
@@ -2428,21 +2427,21 @@ public class DefaultMessageStore implements MessageStore {
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
- if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
- && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
- break;
- }
-
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
- for (int readSize = 0; readSize < result.getSize() && reputFromOffset <= DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+ for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
+ if (reputFromOffset + size > DefaultMessageStore.this.getConfirmOffset()) {
+ doNext = false;
+ break;
+ }
+
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 104b50fa0..25b87a248 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -93,11 +93,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
*/
private volatile long currentReceivedEpoch;
- /**
- * Confirm offset = min(localMaxOffset, master confirm offset).
- */
- private volatile long confirmOffset;
-
public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
EpochFileCache epochCache) throws IOException {
this.haService = haService;
@@ -126,9 +121,9 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
this.currentReceivedEpoch = -1;
this.currentReportedOffset = 0;
this.processPosition = 0;
- this.confirmOffset = -1;
this.lastReadTimestamp = System.currentTimeMillis();
this.lastWriteTimestamp = System.currentTimeMillis();
+ haService.updateConfirmOffset(-1);
}
public void reOpen() throws IOException {
@@ -189,10 +184,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
return this.currentState;
}
- public long getConfirmOffset() {
- return confirmOffset;
- }
-
@Override public void changeCurrentState(HAConnectionState haConnectionState) {
LOGGER.info("change state to {}", haConnectionState);
this.currentState = haConnectionState;
@@ -503,7 +494,8 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
if (bodySize > 0) {
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
}
- AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
+
+ haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
if (!reportSlaveMaxOffset()) {
LOGGER.error("AutoSwitchHAClient report max offset to master failed");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index c0ced3158..775ce9b6b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -53,14 +53,13 @@ public class AutoSwitchHAService extends DefaultHAService {
private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>();
private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>();
private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
- private volatile long confirmOffset;
+ private volatile long confirmOffset = -1;
private String localAddress;
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
-
public AutoSwitchHAService() {
}
@@ -107,6 +106,9 @@ public class AutoSwitchHAService extends DefaultHAService {
// Truncate dirty file
final long truncateOffset = truncateInvalidMsg();
+
+ updateConfirmOffset(computeConfirmOffset());
+
if (truncateOffset >= 0) {
this.epochCache.truncateSuffixByOffset(truncateOffset);
}
@@ -118,6 +120,17 @@ public class AutoSwitchHAService extends DefaultHAService {
}
this.epochCache.appendEntry(newEpochEntry);
+ // Waiting consume queue dispatch
+ while (defaultMessageStore.dispatchBehindBytes() > 0) {
+ try {
+ Thread.sleep(100);
+ } catch (Exception ignored) {
+
+ }
+ }
+
+ LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
+
this.defaultMessageStore.recoverTopicQueueTable();
LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
return true;
@@ -193,7 +206,6 @@ public class AutoSwitchHAService extends DefaultHAService {
return newSyncStateSet;
}
-
/**
* Check and maybe add the slave to inSyncStateSet. A slave will be added to inSyncStateSet if its slaveMaxOffset >=
* current confirmOffset, and it is caught up to an offset within the current leader epoch.
@@ -220,10 +232,10 @@ public class AutoSwitchHAService extends DefaultHAService {
}
/**
- * Get confirm offset (min slaveAckOffset of all syncStateSet members)
+ * Get confirm offset (min slaveAckOffset of all syncStateSet members) for master
*/
public long getConfirmOffset() {
- if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) {
+ if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
if (this.syncStateSet.size() == 1) {
return this.defaultMessageStore.getMaxPhyOffset();
}
@@ -231,11 +243,8 @@ public class AutoSwitchHAService extends DefaultHAService {
if (this.confirmOffset <= 0) {
this.confirmOffset = computeConfirmOffset();
}
- return this.confirmOffset;
- } else if (this.haClient != null) {
- return this.haClient.getConfirmOffset();
}
- return -1;
+ return confirmOffset;
}
public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
@@ -244,11 +253,15 @@ public class AutoSwitchHAService extends DefaultHAService {
}
}
+ public void updateConfirmOffset(long confirmOffset) {
+ this.confirmOffset = confirmOffset;
+ }
+
private long computeConfirmOffset() {
final Set<String> currentSyncStateSet = getSyncStateSet();
long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
for (HAConnection connection : this.connectionList) {
- final String slaveAddress = ((AutoSwitchHAConnection)connection).getSlaveAddress();
+ final String slaveAddress = ((AutoSwitchHAConnection) connection).getSlaveAddress();
if (currentSyncStateSet.contains(slaveAddress)) {
confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
}
@@ -324,7 +337,7 @@ public class AutoSwitchHAService extends DefaultHAService {
}
}
- LOGGER.info("AutoRecoverHAClient truncate commitLog to {}", reputFromOffset);
+ LOGGER.info("Truncate commitLog to {}", reputFromOffset);
this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
return reputFromOffset;
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index d4e690811..0fa9f295f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -82,6 +82,7 @@ public class AutoSwitchHATest {
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
storeConfig1 = new MessageStoreConfig();
storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+ storeConfig1.setHaSendHeartbeatInterval(1000);
storeConfig1.setStorePathRootDir(storePathRootDir + File.separator + "broker1");
storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + "broker1" + File.separator + "commitlog");
storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + "broker1" + File.separator + "EpochFileCache");
@@ -92,6 +93,7 @@ public class AutoSwitchHATest {
storeConfig2 = new MessageStoreConfig();
storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+ storeConfig1.setHaSendHeartbeatInterval(1000);
storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + "broker2");
storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + "broker2" + File.separator + "commitlog");
storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + "broker2" + File.separator + "EpochFileCache");
@@ -106,6 +108,7 @@ public class AutoSwitchHATest {
storeConfig3 = new MessageStoreConfig();
storeConfig3.setBrokerRole(BrokerRole.SLAVE);
+ storeConfig1.setHaSendHeartbeatInterval(1000);
storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + "broker3");
storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + "broker3" + File.separator + "commitlog");
storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + "broker3" + File.separator + "EpochFileCache");
@@ -201,6 +204,7 @@ public class AutoSwitchHATest {
// Step1, set syncStateSet, if both broker1 and broker2 are in syncStateSet, the confirmOffset will be computed as the min slaveAckOffset(broker2's ack)
((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Arrays.asList("127.0.0.1:8000", "127.0.0.1:8001")));
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+ Thread.sleep(6000);
checkMessage(this.messageStore2, 10, 0);
final long confirmOffset = ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset();
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 849788320..222996be3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -105,6 +105,7 @@ public class AutoSwitchRoleBase {
protected MessageStoreConfig buildMessageStoreConfig(final String brokerName, final int haPort,
final int mappedFileSize) {
MessageStoreConfig storeConfig = new MessageStoreConfig();
+ storeConfig.setHaSendHeartbeatInterval(1000);
storeConfig.setBrokerRole(BrokerRole.SLAVE);
storeConfig.setHaListenPort(haPort);
storeConfig.setStorePathRootDir(storePathRootDir + File.separator + brokerName);
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 5b0f1947d..e6a99a1ec 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -80,6 +80,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
System.out.println("Begin test");
final MessageStore messageStore = brokerController1.getMessageStore();
putMessage(messageStore);
+ Thread.sleep(3000);
// Check slave message
checkMessage(brokerController2.getMessageStore(), 10, 0);
}
@@ -142,6 +143,8 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
final MessageStore messageStore = brokerController2.getMessageStore();
putMessage(messageStore);
+ Thread.sleep(3000);
+
// Check slave message
checkMessage(brokerController1.getMessageStore(), 20, 0);
}
@@ -153,11 +156,12 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
waitSlaveReady(broker3.getMessageStore());
- Thread.sleep(6000);
+ Thread.sleep(3000);
checkMessage(broker3.getMessageStore(), 10, 0);
putMessage(this.brokerController1.getMessageStore());
+ Thread.sleep(3000);
checkMessage(broker3.getMessageStore(), 20, 0);
}
@@ -167,6 +171,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
init(1700);
// Step1: Put message
putMessage(this.brokerController1.getMessageStore());
+ Thread.sleep(3000);
checkMessage(this.brokerController2.getMessageStore(), 10, 0);
// Step2: shutdown broker1, broker2 as master