You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/25 13:41:28 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Support async learner in controller mode (#4367)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 25ee6eb32 [Summer of code] Support async learner in controller mode (#4367)
25ee6eb32 is described below
commit 25ee6eb322da4a5c901d60e09b7e255ff0e392d7
Author: hzh0425 <64...@qq.com>
AuthorDate: Wed May 25 21:41:20 2022 +0800
[Summer of code] Support async learner in controller mode (#4367)
* merge branch support_async_learner
* use isSlave() to replace BrokerRole == Slave
* mark asyncLearner
* code review
* Revert "use isSlave() to replace BrokerRole == Slave"
This reverts commit 6599f97f44ece74e6a2e4cbbfc062c31c96237ec.
* review
* remove asyncLeaner role
* code review
* code review
---
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++++
.../store/ha/autoswitch/AutoSwitchHAClient.java | 33 ++++++-------
.../ha/autoswitch/AutoSwitchHAConnection.java | 54 +++++++++++++---------
.../store/ha/autoswitch/AutoSwitchHATest.java | 28 +++++++++++
4 files changed, 85 insertions(+), 40 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 310865805..80ac3f79d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -306,6 +306,8 @@ public class MessageStoreConfig {
*/
private boolean syncFromLastFile = false;
+ private boolean isAsyncLearner = false;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -1322,4 +1324,12 @@ public class MessageStoreConfig {
public void setScheduleAsyncDeliverMaxResendNum2Blocked(int scheduleAsyncDeliverMaxResendNum2Blocked) {
this.scheduleAsyncDeliverMaxResendNum2Blocked = scheduleAsyncDeliverMaxResendNum2Blocked;
}
+
+ public boolean isAsyncLearner() {
+ return isAsyncLearner;
+ }
+
+ public void setAsyncLearner(boolean asyncLearner) {
+ this.isAsyncLearner = asyncLearner;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 81d2ecf17..f784d3873 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -44,9 +44,10 @@ import org.apache.rocketmq.store.ha.io.HAWriter;
public class AutoSwitchHAClient extends ServiceThread implements HAClient {
/**
- * Handshake header buffer size. Schema: state ordinal + flag(isSyncFromLastFile) + slaveId + slaveAddressLength.
+ * Handshake header buffer size. Schema: state ordinal + Two flags + slaveAddressLength
+ * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add more flags in the future if needed
*/
- public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 8 + 4;
+ public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
/**
* Header + slaveAddress.
@@ -97,10 +98,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
*/
private volatile long confirmOffset;
- public static final int SYNC_FROM_LAST_FILE = -1;
-
- public static final int SYNC_FROM_FIRST_FILE = -2;
-
public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
EpochFileCache epochCache) throws IOException {
this.haService = haService;
@@ -256,13 +253,11 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
// Original state
this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
// IsSyncFromLastFile
- if (this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile()) {
- this.handshakeHeaderBuffer.putInt(SYNC_FROM_LAST_FILE);
- } else {
- this.handshakeHeaderBuffer.putInt(SYNC_FROM_FIRST_FILE);
- }
- // Slave Id
- this.handshakeHeaderBuffer.putLong(this.slaveId.get());
+ short isSyncFromLastFile = this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile() ? (short)1 : (short) 0;
+ this.handshakeHeaderBuffer.putShort(isSyncFromLastFile);
+ // IsAsyncLearner role
+ short isAsyncLearner = this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner() ? (short)1 : (short) 0;
+ this.handshakeHeaderBuffer.putShort(isAsyncLearner);
// Address length
this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : this.localAddress.length());
// Slave address
@@ -443,12 +438,12 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition;
if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
int processPosition = AutoSwitchHAClient.this.processPosition;
- int masterState = byteBufferRead.getInt(processPosition);
- int bodySize = byteBufferRead.getInt(processPosition + 4);
- long masterOffset = byteBufferRead.getLong(processPosition + 4 + 4);
- int masterEpoch = byteBufferRead.getInt(processPosition + 4 + 4 + 8);
- long masterEpochStartOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4);
- long confirmOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4 + 8);
+ int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 36);
+ int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 32);
+ long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 28);
+ int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 20);
+ long masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 16);
+ long confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 8);
if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) {
AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index f29ff283e..ef03dc583 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -64,6 +64,7 @@ public class AutoSwitchHAConnection implements HAConnection {
private volatile int currentTransferEpoch = -1;
private volatile long currentTransferEpochEndOffset = 0;
private volatile boolean isSyncFromLastFile = false;
+ private volatile boolean isAsyncLearner = false;
private volatile long slaveId = -1;
private volatile String slaveAddress;
@@ -172,13 +173,21 @@ public class AutoSwitchHAConnection implements HAConnection {
}
}
+ public boolean isAsyncLearner() {
+ return isAsyncLearner;
+ }
+
+ public boolean isSyncFromLastFile() {
+ return isSyncFromLastFile;
+ }
+
private synchronized void updateLastTransferInfo() {
this.lastMasterMaxOffset = this.haService.getDefaultMessageStore().getMaxPhyOffset();
this.lastTransferTimeMs = System.currentTimeMillis();
}
private synchronized void maybeExpandInSyncStateSet(long slaveMaxOffset) {
- if (slaveMaxOffset >= this.lastMasterMaxOffset) {
+ if (!this.isAsyncLearner && slaveMaxOffset >= this.lastMasterMaxOffset) {
this.lastCatchUpTimeMs = Math.max(this.lastTransferTimeMs, this.lastCatchUpTimeMs);
this.haService.maybeExpandInSyncStateSet(this.slaveAddress, slaveMaxOffset);
}
@@ -277,30 +286,33 @@ public class AutoSwitchHAConnection implements HAConnection {
switch (slaveState) {
case HANDSHAKE:
if (diff >= AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE) {
- isSlaveSendHandshake = true;
- // Flag(SyncFromLastFile)
- long syncFromLastFileFlag = byteBufferRead.getInt(readPosition + 4);
- if (syncFromLastFileFlag == AutoSwitchHAClient.SYNC_FROM_LAST_FILE) {
+ // AddressLength
+ int addressLength = byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 4);
+ if (diff < AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
+ break;
+ }
+ // Flag(isSyncFromLastFile)
+ short syncFromLastFileFlag = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 8);
+ if (syncFromLastFileFlag == 1) {
AutoSwitchHAConnection.this.isSyncFromLastFile = true;
- LOGGER.info("Slave request sync from lastFile");
}
- // SlaveId
- AutoSwitchHAConnection.this.slaveId = byteBufferRead.getLong(readPosition + 8);
- // AddressLength
- int addressLength = byteBufferRead.getInt(readPosition + 16);
- // Address
- if (diff >= AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) {
- final byte[] addressData = new byte[addressLength];
- byteBufferRead.position(readPosition + 20);
- byteBufferRead.get(addressData);
- AutoSwitchHAConnection.this.slaveAddress = new String(addressData);
- } else {
- AutoSwitchHAConnection.this.slaveAddress = "";
+ // Flag(isAsyncLearner role)
+ short isAsyncLearner = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 6);
+ if (isAsyncLearner == 1) {
+ AutoSwitchHAConnection.this.isAsyncLearner = true;
}
- LOGGER.info("Receive slave handshake, syncFromLastFile:{}, slaveId:{}, slaveAddress:{}",
- AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.slaveAddress);
+ // Address
+ final byte[] addressData = new byte[addressLength];
+ byteBufferRead.position(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE);
+ byteBufferRead.get(addressData);
+ AutoSwitchHAConnection.this.slaveAddress = new String(addressData);
+
+ isSlaveSendHandshake = true;
byteBufferRead.position(readSocketPos);
ReadSocketService.this.processPosition += AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength;
+ LOGGER.info("Receive slave handshake, slaveId:{}, slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
+ AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.slaveAddress,
+ AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner);
}
break;
case TRANSFER:
@@ -312,10 +324,10 @@ public class AutoSwitchHAConnection implements HAConnection {
if (slaveRequestOffset < 0) {
slaveRequestOffset = slaveMaxOffset;
}
- LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
byteBufferRead.position(readSocketPos);
maybeExpandInSyncStateSet(slaveMaxOffset);
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+ LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
}
break;
default:
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 777a62036..8e8f98db0 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -45,6 +45,7 @@ import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AutoSwitchHATest {
@@ -184,6 +185,33 @@ public class AutoSwitchHATest {
}
}
+ @Test
+ public void testAsyncLearnerBrokerRole() throws Exception {
+ init(defaultMappedFileSize);
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+ ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+
+ storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+ storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+ storeConfig2.setAsyncLearner(true);
+ messageStore1.getHaService().changeToMaster(1);
+ messageStore2.getHaService().changeToSlave("", 1, 2L);
+ messageStore2.getHaService().updateHaMasterAddress(store1HaAddress);
+ Thread.sleep(6000);
+
+ // Put message on master
+ for (int i = 0; i < 10; i++) {
+ messageStore1.putMessage(buildMessage());
+ }
+ Thread.sleep(200);
+
+ checkMessage(messageStore2, 10, 0);
+
+ Thread.sleep(1000);
+ final Set<String> syncStateSet = ((AutoSwitchHAService) this.messageStore1.getHaService()).getSyncStateSet();
+ assertFalse(syncStateSet.contains("127.0.0.1:8001"));
+ }
+
@Test
public void testOptionAllAckInSyncStateSet() throws Exception {
init(defaultMappedFileSize, true);