You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/11/17 12:06:40 UTC
[rocketmq] branch develop updated: [ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's data protocol (#5182)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0c3b40586 [ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's data protocol (#5182)
0c3b40586 is described below
commit 0c3b40586a2b7ec676247f54be32d1466124dda8
Author: mxsm <lj...@gmail.com>
AuthorDate: Thu Nov 17 20:06:34 2022 +0800
[ISSUE #5157] Optimize AutoSwitchHAConnection HandShake's data protocol (#5182)
---
.../store/ha/autoswitch/AutoSwitchHAClient.java | 45 +++++++++++++---------
.../ha/autoswitch/AutoSwitchHAConnection.java | 42 ++++++++++++++------
2 files changed, 57 insertions(+), 30 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 7461279c7..fc85d4054 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -445,27 +445,32 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
try {
while (true) {
int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition;
- if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
- int processPosition = AutoSwitchHAClient.this.processPosition;
- int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 36);
- int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
- long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 28);
- int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 20);
- long masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 16);
- long confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 8);
-
+ if (diff >= AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE) {
+ final int processPosition = AutoSwitchHAClient.this.processPosition;
+ int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 20);
+ int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 16);
+ long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 12);
+ int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 4);
+ long masterEpochStartOffset = 0;
+ long confirmOffset = 0;
+ // if master send transfer header data, set masterEpochStartOffset and confirmOffset value.
+ if (masterState == HAConnectionState.TRANSFER.ordinal() && diff >= AutoSwitchHAConnection.TRANSFER_HEADER_SIZE) {
+ masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 16);
+ confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 8);
+ }
if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) {
- AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+ int headerSize = masterState == HAConnectionState.TRANSFER.ordinal() ? AutoSwitchHAConnection.TRANSFER_HEADER_SIZE : AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
+ AutoSwitchHAClient.this.processPosition += headerSize + bodySize;
AutoSwitchHAClient.this.waitForRunning(1);
LOGGER.error("State not matched, masterState:{}, slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
masterState, AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
- return true;
+ return false;
}
- if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize)) {
+ if (diff >= AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize || diff >= AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
switch (AutoSwitchHAClient.this.currentState) {
- case HANDSHAKE:
- AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE;
+ case HANDSHAKE: {
+ AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE;
// Truncate log
int entrySize = AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
final int entryNums = bodySize / entrySize;
@@ -483,14 +488,14 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
return false;
}
- break;
- case TRANSFER:
+ }
+ break;
+ case TRANSFER: {
byte[] bodyData = new byte[bodySize];
- byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE);
+ byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE);
byteBufferRead.get(bodyData);
byteBufferRead.position(readSocketPos);
- AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
-
+ AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize;
long slavePhyOffset = AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterOffset) {
@@ -517,11 +522,13 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
return false;
}
break;
+ }
default:
break;
}
continue;
}
+
}
if (!byteBufferRead.hasRemaining()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index de20625aa..755b89ade 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -40,12 +40,36 @@ import org.apache.rocketmq.store.ha.io.AbstractHAReader;
import org.apache.rocketmq.store.ha.io.HAWriter;
public class AutoSwitchHAConnection implements HAConnection {
+
+ /**
+ * Handshake data protocol in syncing msg from master. Format:
+ * <pre>
+ * +----------------------------------------------------------------------------------------------+
+ * | current state | body size | offset | epoch | EpochEntrySize * EpochEntryNums |
+ * | (4bytes) | (4bytes) | (8bytes) | (4bytes) | (12bytes * EpochEntryNums) |
+ * +----------------------------------------------------------------------------------------------+
+ * | Header | Body |
+ * | | |
+ * </pre>
+ * Handshake Header protocol Format:
+ * current state + body size + offset + epoch
+ */
+ public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;
+
/**
- * Header protocol in syncing msg from master. Format: current state + body size + offset + epoch +
- * epochStartOffset + additionalInfo(confirmOffset). If the msg is handShakeMsg, the body size = EpochEntrySize *
- * EpochEntryNums, the offset is maxOffset in master.
+ * Transfer data protocol in syncing msg from master. Format:
+ * <pre>
+ * +---------------------------------------------------------------------------------------------------------------------+
+ * | current state | body size | offset | epoch | epochStartOffset | confirmOffset | log data |
+ * | (4bytes) | (4bytes) | (8bytes) | (4bytes) | (8bytes) | (8bytes) | (data size) |
+ * +---------------------------------------------------------------------------------------------------------------------+
+ * | Header | Body |
+ * | | |
+ * </pre>
+ * Transfer Header protocol Format:
+ * current state + body size + offset + epoch + epochStartOffset + additionalInfo(confirmOffset)
*/
- public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
+ public static final int TRANSFER_HEADER_SIZE = HANDSHAKE_HEADER_SIZE + 8 + 8;
public static final int EPOCH_ENTRY_SIZE = 12;
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final AutoSwitchHAService haService;
@@ -433,7 +457,7 @@ public class AutoSwitchHAConnection implements HAConnection {
protected final SocketChannel socketChannel;
protected final HAWriter haWriter;
- protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+ protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
// Store master epochFileCache: (Epoch + startOffset) * 1000
private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
protected long nextTransferFromWhere = -1;
@@ -466,7 +490,7 @@ public class AutoSwitchHAConnection implements HAConnection {
final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+ this.byteBufferHeader.limit(HANDSHAKE_HEADER_SIZE);
// State
this.byteBufferHeader.putInt(currentState.ordinal());
// Body size
@@ -475,10 +499,6 @@ public class AutoSwitchHAConnection implements HAConnection {
this.byteBufferHeader.putLong(maxPhyOffset);
// Epoch
this.byteBufferHeader.putInt(lastEpoch);
- // EpochStartOffset (not needed in handshake)
- this.byteBufferHeader.putLong(0L);
- // Additional info (not needed in handshake)
- this.byteBufferHeader.putLong(0L);
this.byteBufferHeader.flip();
// EpochEntries
@@ -527,7 +547,7 @@ public class AutoSwitchHAConnection implements HAConnection {
}
// Build Header
this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+ this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
// State
this.byteBufferHeader.putInt(currentState.ordinal());
// Body size