You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/15 07:53:28 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: Update the logic of confirmOffset and recover topicQueueTable to prevent message loss when broker role change (#4460)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new d9273e70f Update the logic of confirmOffset and recover topicQueueTable to prevent message loss when broker role change (#4460)
d9273e70f is described below

commit d9273e70f649af7a13d8e8443e8059c4ac1c530d
Author: rongtong <ji...@163.com>
AuthorDate: Wed Jun 15 15:53:10 2022 +0800

    Update the logic of confirmOffset and recover topicQueueTable to prevent message loss when broker role change (#4460)
    
    * Update the logic of confirmOffset and recover topicQueueTable to prevent message loss
    
    * Do not dispatch when reputFromOffset + size is greater than confirmOffset to prevent message loss
    
    * Pass the UT
    
    * Pass the UT
---
 .../broker/hacontroller/ReplicasManager.java       |  6 ++--
 .../controller/impl/DLedgerController.java         |  3 +-
 .../apache/rocketmq/store/DefaultMessageStore.java | 25 ++++++++--------
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 14 ++-------
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 35 +++++++++++++++-------
 .../store/ha/autoswitch/AutoSwitchHATest.java      |  4 +++
 .../test/autoswitchrole/AutoSwitchRoleBase.java    |  1 +
 .../AutoSwitchRoleIntegrationTest.java             |  7 ++++-
 8 files changed, 55 insertions(+), 40 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 65b0d3fc3..c11129ca7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -147,7 +147,8 @@ public class ReplicasManager {
         this.scheduledService.shutdown();
     }
 
-    public synchronized void changeBrokerRole(final String newMasterAddress, final int newMasterEpoch, final int syncStateSetEpoch, final long brokerId) {
+    public synchronized void changeBrokerRole(final String newMasterAddress, final int newMasterEpoch,
+        final int syncStateSetEpoch, final long brokerId) {
         if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > this.masterEpoch) {
             if (StringUtils.equals(newMasterAddress, this.localAddress)) {
                 changeToMaster(newMasterEpoch, syncStateSetEpoch);
@@ -170,7 +171,6 @@ public class ReplicasManager {
                 final HashSet<String> newSyncStateSet = new HashSet<>();
                 newSyncStateSet.add(this.localAddress);
                 changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
-                schedulingCheckSyncStateSet();
 
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
@@ -182,6 +182,8 @@ public class ReplicasManager {
                 this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
                 this.brokerController.changeSpecialServiceStatus(true);
 
+                schedulingCheckSyncStateSet();
+
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 67f293ad1..0027eea36 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -424,9 +424,8 @@ public class DLedgerController implements Controller {
                             } catch (final Throwable e) {
                                 log.error("Error happen when controller leader append initial request to dledger", e);
                                 tryTimes++;
-                                if (tryTimes > 2) {
+                                if (tryTimes % 3 == 0) {
                                     log.warn("Controller leader append initial log failed too many times, please wait a while");
-                                    tryTimes = 0;
                                 }
                             }
                         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 8dc2e9a3a..5b5173d72 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -616,6 +616,8 @@ public class DefaultMessageStore implements MessageStore {
 
         this.reputMessageService.shutdown();
 
+        long oldReputFromOffset = this.reputMessageService.getReputFromOffset();
+
         // truncate commitLog
         this.commitLog.truncateDirtyFiles(offsetToTruncate);
 
@@ -625,7 +627,7 @@ public class DefaultMessageStore implements MessageStore {
         recoverTopicQueueTable();
 
         this.reputMessageService = new ReputMessageService();
-        this.reputMessageService.setReputFromOffset(offsetToTruncate);
+        this.reputMessageService.setReputFromOffset(Math.min(oldReputFromOffset, offsetToTruncate));
         this.reputMessageService.start();
     }
 
@@ -1394,10 +1396,7 @@ public class DefaultMessageStore implements MessageStore {
     @Override
     public long getConfirmOffset() {
         if (this.brokerConfig.isEnableControllerMode()) {
-            long confirmOffset = ((AutoSwitchHAService)this.haService).getConfirmOffset();
-            if (confirmOffset > 0) {
-                return confirmOffset;
-            }
+            return ((AutoSwitchHAService) this.haService).getConfirmOffset();
         }
         return this.commitLog.getConfirmOffset();
     }
@@ -2407,7 +2406,7 @@ public class DefaultMessageStore implements MessageStore {
         }
 
         public long behind() {
-            return DefaultMessageStore.this.commitLog.getConfirmOffset() - this.reputFromOffset;
+            return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset;
         }
 
         private boolean isCommitLogAvailable() {
@@ -2415,7 +2414,7 @@ public class DefaultMessageStore implements MessageStore {
                 return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
             }
             if (DefaultMessageStore.this.getBrokerConfig().isEnableControllerMode()) {
-                return this.reputFromOffset <= ((AutoSwitchHAService)DefaultMessageStore.this.haService).getConfirmOffset();
+                return this.reputFromOffset < ((AutoSwitchHAService) DefaultMessageStore.this.haService).getConfirmOffset();
             }
             return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
         }
@@ -2428,21 +2427,21 @@ public class DefaultMessageStore implements MessageStore {
             }
             for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
 
-                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
-                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
-                    break;
-                }
-
                 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                 if (result != null) {
                     try {
                         this.reputFromOffset = result.getStartOffset();
 
-                        for (int readSize = 0; readSize < result.getSize() && reputFromOffset <= DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
                             DispatchRequest dispatchRequest =
                                 DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false, false);
                             int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
 
+                            if (reputFromOffset + size > DefaultMessageStore.this.getConfirmOffset()) {
+                                doNext = false;
+                                break;
+                            }
+
                             if (dispatchRequest.isSuccess()) {
                                 if (size > 0) {
                                     DefaultMessageStore.this.doDispatch(dispatchRequest);
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 104b50fa0..25b87a248 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -93,11 +93,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
      */
     private volatile long currentReceivedEpoch;
 
-    /**
-     * Confirm offset = min(localMaxOffset, master confirm offset).
-     */
-    private volatile long confirmOffset;
-
     public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
         EpochFileCache epochCache) throws IOException {
         this.haService = haService;
@@ -126,9 +121,9 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
         this.currentReceivedEpoch = -1;
         this.currentReportedOffset = 0;
         this.processPosition = 0;
-        this.confirmOffset = -1;
         this.lastReadTimestamp = System.currentTimeMillis();
         this.lastWriteTimestamp = System.currentTimeMillis();
+        haService.updateConfirmOffset(-1);
     }
 
     public void reOpen() throws IOException {
@@ -189,10 +184,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
         return this.currentState;
     }
 
-    public long getConfirmOffset() {
-        return confirmOffset;
-    }
-
     @Override public void changeCurrentState(HAConnectionState haConnectionState) {
         LOGGER.info("change state to {}", haConnectionState);
         this.currentState = haConnectionState;
@@ -503,7 +494,8 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                                     if (bodySize > 0) {
                                         AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
                                     }
-                                    AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
+
+                                    haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
 
                                     if (!reportSlaveMaxOffset()) {
                                         LOGGER.error("AutoSwitchHAClient report max offset to master failed");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index c0ced3158..775ce9b6b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -53,14 +53,13 @@ public class AutoSwitchHAService extends DefaultHAService {
     private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>();
     private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>();
     private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
-    private volatile long confirmOffset;
+    private volatile long confirmOffset = -1;
 
     private String localAddress;
 
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
 
-
     public AutoSwitchHAService() {
     }
 
@@ -107,6 +106,9 @@ public class AutoSwitchHAService extends DefaultHAService {
 
         // Truncate dirty file
         final long truncateOffset = truncateInvalidMsg();
+
+        updateConfirmOffset(computeConfirmOffset());
+
         if (truncateOffset >= 0) {
             this.epochCache.truncateSuffixByOffset(truncateOffset);
         }
@@ -118,6 +120,17 @@ public class AutoSwitchHAService extends DefaultHAService {
         }
         this.epochCache.appendEntry(newEpochEntry);
 
+        // Waiting consume queue dispatch
+        while (defaultMessageStore.dispatchBehindBytes() > 0) {
+            try {
+                Thread.sleep(100);
+            } catch (Exception ignored) {
+
+            }
+        }
+
+        LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
+
         this.defaultMessageStore.recoverTopicQueueTable();
         LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
         return true;
@@ -193,7 +206,6 @@ public class AutoSwitchHAService extends DefaultHAService {
         return newSyncStateSet;
     }
 
-
     /**
      * Check and maybe add the slave to inSyncStateSet. A slave will be added to inSyncStateSet if its slaveMaxOffset >=
      * current confirmOffset, and it is caught up to an offset within the current leader epoch.
@@ -220,10 +232,10 @@ public class AutoSwitchHAService extends DefaultHAService {
     }
 
     /**
-     * Get confirm offset (min slaveAckOffset of all syncStateSet members)
+     * Get confirm offset (min slaveAckOffset of all syncStateSet members) for master
      */
     public long getConfirmOffset() {
-        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) {
+        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
             if (this.syncStateSet.size() == 1) {
                 return this.defaultMessageStore.getMaxPhyOffset();
             }
@@ -231,11 +243,8 @@ public class AutoSwitchHAService extends DefaultHAService {
             if (this.confirmOffset <= 0) {
                 this.confirmOffset = computeConfirmOffset();
             }
-            return this.confirmOffset;
-        } else if (this.haClient != null) {
-            return this.haClient.getConfirmOffset();
         }
-        return -1;
+        return confirmOffset;
     }
 
     public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
@@ -244,11 +253,15 @@ public class AutoSwitchHAService extends DefaultHAService {
         }
     }
 
+    public void updateConfirmOffset(long confirmOffset) {
+        this.confirmOffset = confirmOffset;
+    }
+
     private long computeConfirmOffset() {
         final Set<String> currentSyncStateSet = getSyncStateSet();
         long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
         for (HAConnection connection : this.connectionList) {
-            final String slaveAddress = ((AutoSwitchHAConnection)connection).getSlaveAddress();
+            final String slaveAddress = ((AutoSwitchHAConnection) connection).getSlaveAddress();
             if (currentSyncStateSet.contains(slaveAddress)) {
                 confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
             }
@@ -324,7 +337,7 @@ public class AutoSwitchHAService extends DefaultHAService {
             }
         }
 
-        LOGGER.info("AutoRecoverHAClient truncate commitLog to {}", reputFromOffset);
+        LOGGER.info("Truncate commitLog to {}", reputFromOffset);
         this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
         return reputFromOffset;
     }
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index d4e690811..0fa9f295f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -82,6 +82,7 @@ public class AutoSwitchHATest {
         BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
         storeConfig1 = new MessageStoreConfig();
         storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+        storeConfig1.setHaSendHeartbeatInterval(1000);
         storeConfig1.setStorePathRootDir(storePathRootDir + File.separator + "broker1");
         storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + "broker1" + File.separator + "commitlog");
         storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + "broker1" + File.separator + "EpochFileCache");
@@ -92,6 +93,7 @@ public class AutoSwitchHATest {
 
         storeConfig2 = new MessageStoreConfig();
         storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+        storeConfig1.setHaSendHeartbeatInterval(1000);
         storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + "broker2");
         storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + "broker2" + File.separator + "commitlog");
         storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + "broker2" + File.separator + "EpochFileCache");
@@ -106,6 +108,7 @@ public class AutoSwitchHATest {
 
         storeConfig3 = new MessageStoreConfig();
         storeConfig3.setBrokerRole(BrokerRole.SLAVE);
+        storeConfig1.setHaSendHeartbeatInterval(1000);
         storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + "broker3");
         storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + "broker3" + File.separator + "commitlog");
         storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + "broker3" + File.separator + "EpochFileCache");
@@ -201,6 +204,7 @@ public class AutoSwitchHATest {
         // Step1, set syncStateSet, if both broker1 and broker2 are in syncStateSet, the confirmOffset will be computed as the min slaveAckOffset(broker2's ack)
         ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Arrays.asList("127.0.0.1:8000", "127.0.0.1:8001")));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+        Thread.sleep(6000);
         checkMessage(this.messageStore2, 10, 0);
 
         final long confirmOffset = ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset();
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 849788320..222996be3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -105,6 +105,7 @@ public class AutoSwitchRoleBase {
     protected MessageStoreConfig buildMessageStoreConfig(final String brokerName, final int haPort,
         final int mappedFileSize) {
         MessageStoreConfig storeConfig = new MessageStoreConfig();
+        storeConfig.setHaSendHeartbeatInterval(1000);
         storeConfig.setBrokerRole(BrokerRole.SLAVE);
         storeConfig.setHaListenPort(haPort);
         storeConfig.setStorePathRootDir(storePathRootDir + File.separator + brokerName);
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 5b0f1947d..e6a99a1ec 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -80,6 +80,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         System.out.println("Begin test");
         final MessageStore messageStore = brokerController1.getMessageStore();
         putMessage(messageStore);
+        Thread.sleep(3000);
         // Check slave message
         checkMessage(brokerController2.getMessageStore(), 10, 0);
     }
@@ -142,6 +143,8 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         final MessageStore messageStore = brokerController2.getMessageStore();
         putMessage(messageStore);
 
+        Thread.sleep(3000);
+
         // Check slave message
         checkMessage(brokerController1.getMessageStore(), 20, 0);
     }
@@ -153,11 +156,12 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
 
         BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
         waitSlaveReady(broker3.getMessageStore());
-        Thread.sleep(6000);
+        Thread.sleep(3000);
 
         checkMessage(broker3.getMessageStore(), 10, 0);
 
         putMessage(this.brokerController1.getMessageStore());
+        Thread.sleep(3000);
         checkMessage(broker3.getMessageStore(), 20, 0);
     }
 
@@ -167,6 +171,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         init(1700);
         // Step1: Put message
         putMessage(this.brokerController1.getMessageStore());
+        Thread.sleep(3000);
         checkMessage(this.brokerController2.getMessageStore(), 10, 0);
 
         // Step2: shutdown broker1, broker2 as master