You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/25 13:41:28 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Support async learner in controller mode (#4367)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 25ee6eb32 [Summer of code] Support async learner in controller mode (#4367)
25ee6eb32 is described below

commit 25ee6eb322da4a5c901d60e09b7e255ff0e392d7
Author: hzh0425 <64...@qq.com>
AuthorDate: Wed May 25 21:41:20 2022 +0800

    [Summer of code] Support async learner in controller mode (#4367)
    
    * merge branch support_async_learner
    
    * use isSlave() to replace BrokerRole == Slave
    
    * mark asyncLearner
    
    * code review
    
    * Revert "use isSlave() to replace BrokerRole == Slave"
    
    This reverts commit 6599f97f44ece74e6a2e4cbbfc062c31c96237ec.
    
    * review
    
    * remove asyncLeaner role
    
    * code review
    
    * code review
---
 .../rocketmq/store/config/MessageStoreConfig.java  | 10 ++++
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 33 ++++++-------
 .../ha/autoswitch/AutoSwitchHAConnection.java      | 54 +++++++++++++---------
 .../store/ha/autoswitch/AutoSwitchHATest.java      | 28 +++++++++++
 4 files changed, 85 insertions(+), 40 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 310865805..80ac3f79d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -306,6 +306,8 @@ public class MessageStoreConfig {
      */
     private boolean syncFromLastFile = false;
 
+    private boolean isAsyncLearner = false;
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -1322,4 +1324,12 @@ public class MessageStoreConfig {
     public void setScheduleAsyncDeliverMaxResendNum2Blocked(int scheduleAsyncDeliverMaxResendNum2Blocked) {
         this.scheduleAsyncDeliverMaxResendNum2Blocked = scheduleAsyncDeliverMaxResendNum2Blocked;
     }
+
+    public boolean isAsyncLearner() {
+        return isAsyncLearner;
+    }
+
+    public void setAsyncLearner(boolean asyncLearner) {
+        this.isAsyncLearner = asyncLearner;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 81d2ecf17..f784d3873 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -44,9 +44,10 @@ import org.apache.rocketmq.store.ha.io.HAWriter;
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
     /**
-     * Handshake header buffer size. Schema: state ordinal + flag(isSyncFromLastFile) + slaveId + slaveAddressLength.
+     * Handshake header buffer size. Schema: state ordinal + Two flags + slaveAddressLength
+     * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add more flags in the future if needed
      */
-    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;
+    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
 
     /**
      * Header + slaveAddress.
@@ -97,10 +98,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
      */
     private volatile long confirmOffset;
 
-    public static final int SYNC_FROM_LAST_FILE = -1;
-
-    public static final int SYNC_FROM_FIRST_FILE = -2;
-
     public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
         EpochFileCache epochCache) throws IOException {
         this.haService = haService;
@@ -256,13 +253,11 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
         // Original state
         this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
         // IsSyncFromLastFile
-        if (this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile()) {
-            this.handshakeHeaderBuffer.putInt(SYNC_FROM_LAST_FILE);
-        } else {
-            this.handshakeHeaderBuffer.putInt(SYNC_FROM_FIRST_FILE);
-        }
-        // Slave Id
-        this.handshakeHeaderBuffer.putLong(this.slaveId.get());
+        short isSyncFromLastFile = this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile() ? (short)1 : (short) 0;
+        this.handshakeHeaderBuffer.putShort(isSyncFromLastFile);
+        // IsAsyncLearner role
+        short isAsyncLearner = this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner() ? (short)1 : (short) 0;
+        this.handshakeHeaderBuffer.putShort(isAsyncLearner);
         // Address length
         this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : this.localAddress.length());
         // Slave address
@@ -443,12 +438,12 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                 int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition;
                 if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
                     int processPosition =  AutoSwitchHAClient.this.processPosition;
-                    int masterState = byteBufferRead.getInt(processPosition);
-                    int bodySize = byteBufferRead.getInt(processPosition + 4);
-                    long masterOffset = byteBufferRead.getLong(processPosition + 4 + 4);
-                    int masterEpoch = byteBufferRead.getInt(processPosition + 4 + 4 + 8);
-                    long masterEpochStartOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4);
-                    long confirmOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4 + 8);
+                    int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 36);
+                    int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
+                    long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 28);
+                    int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 20);
+                    long masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 16);
+                    long confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 8);
 
                     if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) {
                         AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index f29ff283e..ef03dc583 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -64,6 +64,7 @@ public class AutoSwitchHAConnection implements HAConnection {
     private volatile int currentTransferEpoch = -1;
     private volatile long currentTransferEpochEndOffset = 0;
     private volatile boolean isSyncFromLastFile = false;
+    private volatile boolean isAsyncLearner = false;
     private volatile long slaveId = -1;
     private volatile String slaveAddress;
 
@@ -172,13 +173,21 @@ public class AutoSwitchHAConnection implements HAConnection {
         }
     }
 
+    public boolean isAsyncLearner() {
+        return isAsyncLearner;
+    }
+
+    public boolean isSyncFromLastFile() {
+        return isSyncFromLastFile;
+    }
+
     private synchronized void updateLastTransferInfo() {
         this.lastMasterMaxOffset = this.haService.getDefaultMessageStore().getMaxPhyOffset();
         this.lastTransferTimeMs = System.currentTimeMillis();
     }
 
     private synchronized void maybeExpandInSyncStateSet(long slaveMaxOffset) {
-        if (slaveMaxOffset >= this.lastMasterMaxOffset) {
+        if (!this.isAsyncLearner && slaveMaxOffset >= this.lastMasterMaxOffset) {
             this.lastCatchUpTimeMs = Math.max(this.lastTransferTimeMs, this.lastCatchUpTimeMs);
             this.haService.maybeExpandInSyncStateSet(this.slaveAddress, slaveMaxOffset);
         }
@@ -277,30 +286,33 @@ public class AutoSwitchHAConnection implements HAConnection {
                         switch (slaveState) {
                             case HANDSHAKE:
                                 if (diff >= AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE) {
-                                    isSlaveSendHandshake = true;
-                                    // Flag(SyncFromLastFile)
-                                    long syncFromLastFileFlag = byteBufferRead.getInt(readPosition + 4);
-                                    if (syncFromLastFileFlag == AutoSwitchHAClient.SYNC_FROM_LAST_FILE) {
+                                    // AddressLength
+                                    int addressLength = byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 4);
+                                    if (diff < AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
+                                        break;
+                                    }
+                                    // Flag(isSyncFromLastFile)
+                                    short syncFromLastFileFlag = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 8);
+                                    if (syncFromLastFileFlag == 1) {
                                         AutoSwitchHAConnection.this.isSyncFromLastFile = true;
-                                        LOGGER.info("Slave request sync from lastFile");
                                     }
-                                    // SlaveId
-                                    AutoSwitchHAConnection.this.slaveId = byteBufferRead.getLong(readPosition + 8);
-                                    // AddressLength
-                                    int addressLength = byteBufferRead.getInt(readPosition + 16);
-                                    // Address
-                                    if (diff >= AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
-                                        final byte[] addressData = new byte[addressLength];
-                                        byteBufferRead.position(readPosition + 20);
-                                        byteBufferRead.get(addressData);
-                                        AutoSwitchHAConnection.this.slaveAddress = new String(addressData);
-                                    } else {
-                                        AutoSwitchHAConnection.this.slaveAddress = "";
+                                    // Flag(isAsyncLearner role)
+                                    short isAsyncLearner = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 6);
+                                    if (isAsyncLearner == 1) {
+                                        AutoSwitchHAConnection.this.isAsyncLearner = true;
                                     }
-                                    LOGGER.info("Receive slave handshake, syncFromLastFile:{}, slaveId:{}, slaveAddress:{}",
-                                        AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.slaveAddress);
+                                    // Address
+                                    final byte[] addressData = new byte[addressLength];
+                                    byteBufferRead.position(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE);
+                                    byteBufferRead.get(addressData);
+                                    AutoSwitchHAConnection.this.slaveAddress = new String(addressData);
+
+                                    isSlaveSendHandshake = true;
                                     byteBufferRead.position(readSocketPos);
                                     ReadSocketService.this.processPosition += AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength;
+                                    LOGGER.info("Receive slave handshake, slaveId:{}, slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
+                                        AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.slaveAddress,
+                                        AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner);
                                 }
                                 break;
                             case TRANSFER:
@@ -312,10 +324,10 @@ public class AutoSwitchHAConnection implements HAConnection {
                                     if (slaveRequestOffset < 0) {
                                         slaveRequestOffset = slaveMaxOffset;
                                     }
-                                    LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
                                     byteBufferRead.position(readSocketPos);
                                     maybeExpandInSyncStateSet(slaveMaxOffset);
                                     AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                    LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
                                 }
                                 break;
                             default:
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 777a62036..8e8f98db0 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -45,6 +45,7 @@ import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AutoSwitchHATest {
@@ -184,6 +185,33 @@ public class AutoSwitchHATest {
         }
     }
 
+    @Test
+    public void testAsyncLearnerBrokerRole() throws Exception {
+        init(defaultMappedFileSize);
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+        ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+
+        storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+        storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+        storeConfig2.setAsyncLearner(true);
+        messageStore1.getHaService().changeToMaster(1);
+        messageStore2.getHaService().changeToSlave("", 1, 2L);
+        messageStore2.getHaService().updateHaMasterAddress(store1HaAddress);
+        Thread.sleep(6000);
+
+        // Put message on master
+        for (int i = 0; i < 10; i++) {
+            messageStore1.putMessage(buildMessage());
+        }
+        Thread.sleep(200);
+
+        checkMessage(messageStore2, 10, 0);
+
+        Thread.sleep(1000);
+        final Set<String> syncStateSet = ((AutoSwitchHAService) this.messageStore1.getHaService()).getSyncStateSet();
+        assertFalse(syncStateSet.contains("127.0.0.1:8001"));
+    }
+
     @Test
     public void testOptionAllAckInSyncStateSet() throws Exception {
         init(defaultMappedFileSize, true);