You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/11/17 12:06:40 UTC

[rocketmq] branch develop updated: [ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's data protocol (#5182)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0c3b40586 [ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's data protocol (#5182)
0c3b40586 is described below

commit 0c3b40586a2b7ec676247f54be32d1466124dda8
Author: mxsm <lj...@gmail.com>
AuthorDate: Thu Nov 17 20:06:34 2022 +0800

    [ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's data protocol (#5182)
---
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 45 +++++++++++++---------
 .../ha/autoswitch/AutoSwitchHAConnection.java      | 42 ++++++++++++++------
 2 files changed, 57 insertions(+), 30 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 7461279c7..fc85d4054 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -445,27 +445,32 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
             try {
                 while (true) {
                     int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition;
-                    if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
-                        int processPosition = AutoSwitchHAClient.this.processPosition;
-                        int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 36);
-                        int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
-                        long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 28);
-                        int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 20);
-                        long masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 16);
-                        long confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 8);
-
+                    if (diff >= AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE) {
+                        final int processPosition = AutoSwitchHAClient.this.processPosition;
+                        int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 20);
+                        int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 16);
+                        long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 12);
+                        int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 4);
+                        long masterEpochStartOffset = 0;
+                        long confirmOffset = 0;
+                        // if master send transfer header data, set masterEpochStartOffset and confirmOffset value.
+                        if (masterState == HAConnectionState.TRANSFER.ordinal() && diff >= AutoSwitchHAConnection.TRANSFER_HEADER_SIZE) {
+                            masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 16);
+                            confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 8);
+                        }
                         if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) {
-                            AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+                            int headerSize = masterState == HAConnectionState.TRANSFER.ordinal() ? AutoSwitchHAConnection.TRANSFER_HEADER_SIZE : AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
+                            AutoSwitchHAClient.this.processPosition += headerSize + bodySize;
                             AutoSwitchHAClient.this.waitForRunning(1);
                             LOGGER.error("State not matched, masterState:{}, slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
                                 masterState, AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
-                            return true;
+                            return false;
                         }
 
-                        if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize)) {
+                        if (diff >= AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize || diff >= AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
                             switch (AutoSwitchHAClient.this.currentState) {
-                                case HANDSHAKE:
-                                    AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE;
+                                case HANDSHAKE: {
+                                    AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
                                     // Truncate log
                                     int entrySize = AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
                                     final int entryNums = bodySize / entrySize;
@@ -483,14 +488,14 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                                         LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
                                         return false;
                                     }
-                                    break;
-                                case TRANSFER:
+                                }
+                                break;
+                                case TRANSFER: {
                                     byte[] bodyData = new byte[bodySize];
-                                    byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE);
+                                    byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE);
                                     byteBufferRead.get(bodyData);
                                     byteBufferRead.position(readSocketPos);
-                                    AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
-
+                                    AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize;
                                     long slavePhyOffset = AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
                                     if (slavePhyOffset != 0) {
                                         if (slavePhyOffset != masterOffset) {
@@ -517,11 +522,13 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                                         return false;
                                     }
                                     break;
+                                }
                                 default:
                                     break;
                             }
                             continue;
                         }
+
                     }
 
                     if (!byteBufferRead.hasRemaining()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index de20625aa..755b89ade 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -40,12 +40,36 @@ import org.apache.rocketmq.store.ha.io.AbstractHAReader;
 import org.apache.rocketmq.store.ha.io.HAWriter;
 
 public class AutoSwitchHAConnection implements HAConnection {
+
+    /**
+     * Handshake data protocol in syncing msg from master. Format:
+     * <pre>
+     * +----------------------------------------------------------------------------------------------+
+     * |  current state  |   body size   |   offset  |   epoch   |   EpochEntrySize * EpochEntryNums  |
+     * |     (4bytes)    |   (4bytes)    |  (8bytes) |  (4bytes) |      (12bytes * EpochEntryNums)    |
+     * +----------------------------------------------------------------------------------------------+
+     * |                       Header                            |             Body                   |
+     * |                                                         |                                    |
+     * </pre>
+     * Handshake Header protocol Format:
+     *  current state + body size + offset + epoch
+     */
+    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;
+
     /**
-     * Header protocol in syncing msg from master. Format: current state + body size + offset + epoch  +
-     * epochStartOffset + additionalInfo(confirmOffset). If the msg is handShakeMsg, the body size = EpochEntrySize *
-     * EpochEntryNums, the offset is maxOffset in master.
+     * Transfer data protocol in syncing msg from master. Format:
+     * <pre>
+     * +---------------------------------------------------------------------------------------------------------------------+
+     * |  current state  |   body size   |   offset  |   epoch   |   epochStartOffset  |   confirmOffset  |    log data      |
+     * |     (4bytes)    |   (4bytes)    |  (8bytes) |  (4bytes) |      (8bytes)       |      (8bytes)    |   (data size)    |
+     * +---------------------------------------------------------------------------------------------------------------------+
+     * |                                               Header                                             |       Body       |
+     * |                                                                                                  |                  |
+     * </pre>
+     * Transfer Header protocol Format:
+     *  current state + body size + offset + epoch  + epochStartOffset + additionalInfo(confirmOffset)
      */
-    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
+    public static final int TRANSFER_HEADER_SIZE = HANDSHAKE_HEADER_SIZE + 8 + 8;
     public static final int EPOCH_ENTRY_SIZE = 12;
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private final AutoSwitchHAService haService;
@@ -433,7 +457,7 @@ public class AutoSwitchHAConnection implements HAConnection {
         protected final SocketChannel socketChannel;
         protected final HAWriter haWriter;
 
-        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
         // Store master epochFileCache: (Epoch + startOffset) * 1000
         private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
         protected long nextTransferFromWhere = -1;
@@ -466,7 +490,7 @@ public class AutoSwitchHAConnection implements HAConnection {
             final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
             final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
             this.byteBufferHeader.position(0);
-            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            this.byteBufferHeader.limit(HANDSHAKE_HEADER_SIZE);
             // State
             this.byteBufferHeader.putInt(currentState.ordinal());
             // Body size
@@ -475,10 +499,6 @@ public class AutoSwitchHAConnection implements HAConnection {
             this.byteBufferHeader.putLong(maxPhyOffset);
             // Epoch
             this.byteBufferHeader.putInt(lastEpoch);
-            // EpochStartOffset (not needed in handshake)
-            this.byteBufferHeader.putLong(0L);
-            // Additional info (not needed in handshake)
-            this.byteBufferHeader.putLong(0L);
             this.byteBufferHeader.flip();
 
             // EpochEntries
@@ -527,7 +547,7 @@ public class AutoSwitchHAConnection implements HAConnection {
             }
             // Build Header
             this.byteBufferHeader.position(0);
-            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
             // State
             this.byteBufferHeader.putInt(currentState.ordinal());
             // Body size