You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/13 06:41:52 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Use confirm offset in ReputMessageService (#4449)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new f038cf95d [Summer of code] Use confirm offset in ReputMessageService (#4449)
f038cf95d is described below

commit f038cf95d72700419706861796eddf5c57f6b6b4
Author: hzh0425 <64...@qq.com>
AuthorDate: Mon Jun 13 14:41:28 2022 +0800

    [Summer of code] Use confirm offset in ReputMessageService (#4449)
    
    * use confirm offset when controller mode
    
    * fix bug
    
    * record confirmOffset as a variable
    
    * pass AutoSwitchHAService's test, add confirmOffset test.
    
    * re trigger ci
    
    * review
---
 .../apache/rocketmq/store/DefaultMessageStore.java |  9 +++
 .../store/ha/autoswitch/AutoSwitchHAClient.java    |  6 +-
 .../ha/autoswitch/AutoSwitchHAConnection.java      |  1 +
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 31 ++++++++-
 .../store/ha/autoswitch/AutoSwitchHATest.java      | 79 +++++++++++++++++++---
 5 files changed, 113 insertions(+), 13 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7c57cf11f..8dc2e9a3a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1393,6 +1393,12 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public long getConfirmOffset() {
+        if (this.brokerConfig.isEnableControllerMode()) {
+            long confirmOffset = ((AutoSwitchHAService)this.haService).getConfirmOffset();
+            if (confirmOffset > 0) {
+                return confirmOffset;
+            }
+        }
         return this.commitLog.getConfirmOffset();
     }
 
@@ -2408,6 +2414,9 @@ public class DefaultMessageStore implements MessageStore {
             if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) {
                 return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
             }
+            if (DefaultMessageStore.this.getBrokerConfig().isEnableControllerMode()) {
+                return this.reputFromOffset <= ((AutoSwitchHAService)DefaultMessageStore.this.haService).getConfirmOffset();
+            }
             return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
         }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 997deb7e8..104b50fa0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -189,6 +189,10 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
         return this.currentState;
     }
 
+    public long getConfirmOffset() {
+        return confirmOffset;
+    }
+
     @Override public void changeCurrentState(HAConnectionState haConnectionState) {
         LOGGER.info("change state to {}", haConnectionState);
         this.currentState = haConnectionState;
@@ -495,11 +499,11 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                                         AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
                                         AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, masterEpochStartOffset));
                                     }
-                                    AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
 
                                     if (bodySize > 0) {
                                         AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
                                     }
+                                    AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
 
                                     if (!reportSlaveMaxOffset()) {
                                         LOGGER.error("AutoSwitchHAClient report max offset to master failed");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 7d689b69a..d7bfff3b7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -322,6 +322,7 @@ public class AutoSwitchHAConnection implements HAConnection {
                                 }
                                 byteBufferRead.position(readSocketPos);
                                 maybeExpandInSyncStateSet(slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.updateConfirmOffsetWhenSlaveAck(AutoSwitchHAConnection.this.slaveAddress);
                                 AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
                                 LOGGER.debug("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
                                 break;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 9fab640f2..c0ced3158 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.ha.DefaultHAService;
 import org.apache.rocketmq.store.ha.GroupTransferService;
 import org.apache.rocketmq.store.ha.HAClient;
@@ -52,11 +53,14 @@ public class AutoSwitchHAService extends DefaultHAService {
     private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>();
     private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>();
     private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
+    private volatile long confirmOffset;
+
     private String localAddress;
 
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
 
+
     public AutoSwitchHAService() {
     }
 
@@ -115,7 +119,6 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.epochCache.appendEntry(newEpochEntry);
 
         this.defaultMessageStore.recoverTopicQueueTable();
-
         LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
         return true;
     }
@@ -220,10 +223,33 @@ public class AutoSwitchHAService extends DefaultHAService {
      * Get confirm offset (min slaveAckOffset of all syncStateSet members)
      */
     public long getConfirmOffset() {
+        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) {
+            if (this.syncStateSet.size() == 1) {
+                return this.defaultMessageStore.getMaxPhyOffset();
+            }
+            // First time compute confirmOffset.
+            if (this.confirmOffset <= 0) {
+                this.confirmOffset = computeConfirmOffset();
+            }
+            return this.confirmOffset;
+        } else if (this.haClient != null) {
+            return this.haClient.getConfirmOffset();
+        }
+        return -1;
+    }
+
+    public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
+        if (this.syncStateSet.contains(slaveAddress)) {
+            this.confirmOffset = computeConfirmOffset();
+        }
+    }
+
+    private long computeConfirmOffset() {
         final Set<String> currentSyncStateSet = getSyncStateSet();
         long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
         for (HAConnection connection : this.connectionList) {
-            if (currentSyncStateSet.contains(connection.getClientAddress())) {
+            final String slaveAddress = ((AutoSwitchHAConnection)connection).getSlaveAddress();
+            if (currentSyncStateSet.contains(slaveAddress)) {
                 confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
             }
         }
@@ -233,6 +259,7 @@ public class AutoSwitchHAService extends DefaultHAService {
     public synchronized void setSyncStateSet(final Set<String> syncStateSet) {
         this.syncStateSet.clear();
         this.syncStateSet.addAll(syncStateSet);
+        this.confirmOffset = computeConfirmOffset();
     }
 
     public synchronized Set<String> getSyncStateSet() {
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index dfbc35f81..d4e690811 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -121,6 +122,10 @@ public class AutoSwitchHATest {
         messageStore1.start();
         messageStore2.start();
         messageStore3.start();
+
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+        ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+        ((AutoSwitchHAService) this.messageStore3.getHaService()).setLocalAddress("127.0.0.1:8002");
     }
 
     public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws Exception {
@@ -154,6 +159,9 @@ public class AutoSwitchHATest {
         assertTrue(messageStore2.load());
         messageStore1.start();
         messageStore2.start();
+
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+        ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
     }
 
     private void changeMasterAndPutMessage(DefaultMessageStore master, MessageStoreConfig masterConfig,
@@ -172,7 +180,7 @@ public class AutoSwitchHATest {
         for (int i = 0; i < totalPutMessageNums; i++) {
             master.putMessage(buildMessage());
         }
-        Thread.sleep(200);
+        Thread.sleep(1000);
     }
 
     private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs, int startOffset) {
@@ -187,12 +195,61 @@ public class AutoSwitchHATest {
         }
     }
 
+    @Test
+    public void testConfirmOffset() throws Exception {
+        init(defaultMappedFileSize, true);
+        // Step1, set syncStateSet, if both broker1 and broker2 are in syncStateSet, the confirmOffset will be computed as the min slaveAckOffset(broker2's ack)
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Arrays.asList("127.0.0.1:8000", "127.0.0.1:8001")));
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+
+        final long confirmOffset = ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset();
+
+        // Step2, shutdown store2
+        this.messageStore2.shutdown();
+
+        // Put some messages, which should put failed.
+        for (int i = 0; i < 3; i++) {
+            final PutMessageResult putMessageResult = this.messageStore1.putMessage(buildMessage());
+            assertEquals(putMessageResult.getPutMessageStatus(), PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+        }
+
+        // The confirmOffset still don't change, because syncStateSet contains broker2, but broker2 shutdown
+        assertEquals(confirmOffset, ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset());
+
+        // Step3, shutdown store1, start store2, change store2 to master, epoch = 2
+        this.messageStore1.shutdown();
+
+        storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER);
+        messageStore2 = buildMessageStore(storeConfig2, 2L);
+        assertTrue(messageStore2.load());
+        messageStore2.start();
+        messageStore2.getHaService().changeToMaster(2);
+        ((AutoSwitchHAService) messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001")));
+        Thread.sleep(6000);
+
+        // Put message on master
+        for (int i = 0; i < 10; i++) {
+            messageStore2.putMessage(buildMessage());
+        }
+        Thread.sleep(200);
+
+        // Step4, start store1, it should truncate dirty logs and syncLog from store2
+        storeConfig1.setBrokerRole(BrokerRole.SLAVE);
+        messageStore1 = buildMessageStore(storeConfig1, 1L);
+        assertTrue(messageStore1.load());
+        messageStore1.start();
+        messageStore1.getHaService().changeToSlave("", 2, 1L);
+        messageStore1.getHaService().updateHaMasterAddress(this.store2HaAddress);
+        Thread.sleep(6000);
+
+        checkMessage(this.messageStore1, 20, 0);
+    }
+
     @Test
     public void testAsyncLearnerBrokerRole() throws Exception {
         init(defaultMappedFileSize);
-        ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
         ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
-        ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
 
         storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
         storeConfig2.setBrokerRole(BrokerRole.SLAVE);
@@ -201,7 +258,6 @@ public class AutoSwitchHATest {
         messageStore2.getHaService().changeToSlave("", 1, 2L);
         messageStore2.getHaService().updateHaMasterAddress(store1HaAddress);
         Thread.sleep(6000);
-
         // Put message on master
         for (int i = 0; i < 10; i++) {
             messageStore1.putMessage(buildMessage());
@@ -219,16 +275,13 @@ public class AutoSwitchHATest {
     public void testOptionAllAckInSyncStateSet() throws Exception {
         init(defaultMappedFileSize, true);
         AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
-        ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
         ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
-        ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
         ((AutoSwitchHAService) this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet) -> {
             System.out.println("Get newSyncStateSet:" + newSyncStateSet);
             syncStateSet.set(newSyncStateSet);
         });
 
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
-        Thread.sleep(1000);
         checkMessage(this.messageStore2, 10, 0);
 
         Thread.sleep(1000);
@@ -251,10 +304,12 @@ public class AutoSwitchHATest {
     public void testChangeRoleManyTimes() throws Exception {
         // Step1, change store1 to master, store2 to follower
         init(defaultMappedFileSize);
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
 
         // Step2, change store1 to follower, store2 to master, epoch = 2
+        ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001")));
         changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore1, 1, this.storeConfig1, 2, store2HaAddress, 10);
         checkMessage(this.messageStore1, 20, 0);
 
@@ -267,6 +322,8 @@ public class AutoSwitchHATest {
     public void testAddBroker() throws Exception {
         // Step1: broker1 as leader, broker2 as follower
         init(defaultMappedFileSize);
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
 
@@ -285,8 +342,10 @@ public class AutoSwitchHATest {
         // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
         // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
 
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
+
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
         checkMessage(this.messageStore2, 20, 0);
 
@@ -321,6 +380,7 @@ public class AutoSwitchHATest {
         // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
         // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
 
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
@@ -348,6 +408,7 @@ public class AutoSwitchHATest {
         checkMessage(messageStore3, 10, 10);
 
         // Step5: change broker2 as leader, broker3 as follower
+        ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001")));
         changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore3, 3, this.storeConfig3, 3, this.store2HaAddress, 10);
         checkMessage(messageStore3, 20, 10);
 
@@ -359,19 +420,18 @@ public class AutoSwitchHATest {
         checkMessage(messageStore1, 20, 0);
     }
 
-
     @Test
     public void testAddBrokerAndSyncFromLastFile() throws Exception {
         init(1700);
 
         // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
         // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
         checkMessage(this.messageStore2, 10, 0);
         changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
         checkMessage(this.messageStore2, 20, 0);
 
-
         // Step2: restart broker3
         messageStore3.shutdown();
         messageStore3.destroy();
@@ -388,7 +448,6 @@ public class AutoSwitchHATest {
         checkMessage(messageStore3, 10, 10);
     }
 
-
     @After
     public void destroy() throws Exception {
         Thread.sleep(5000L);