You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/04 14:42:07 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: Polish switching logic and auto switch ha code (#4406)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new f27800a13 Polish switching logic and auto switch ha code (#4406)
f27800a13 is described below
commit f27800a134d1a595fcf9654319ea6db8a9cf1713
Author: rongtong <ji...@163.com>
AuthorDate: Sat Jun 4 22:42:01 2022 +0800
Polish switching logic and auto switch ha code (#4406)
* Polish switching logic and auto switch ha code
* Make UT can pass
* Polish the code
---
.../rocketmq/broker/hacontroller/ReplicasManager.java | 13 +++++++++----
.../controller/impl/manager/ReplicasInfoManager.java | 15 +++++++--------
.../main/java/org/apache/rocketmq/store/CommitLog.java | 2 ++
.../store/ha/autoswitch/AutoSwitchHAConnection.java | 4 +++-
.../rocketmq/store/ha/autoswitch/AutoSwitchHAService.java | 4 +---
.../rocketmq/store/ha/autoswitch/AutoSwitchHATest.java | 4 ++++
6 files changed, 26 insertions(+), 16 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 48f61f5e4..94bfa5d63 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -169,6 +169,9 @@ public class ReplicasManager {
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
+ // Notify ha service, change to master
+ this.haService.changeToMaster(newMasterEpoch);
+
this.executorService.submit(() -> {
// Register broker to name-srv
try {
@@ -177,8 +180,6 @@ public class ReplicasManager {
LOGGER.error("Error happen when register broker to name-srv, Failed to change broker to master", e);
return;
}
- // Notify ha service, change to master
- this.haService.changeToMaster(newMasterEpoch);
LOGGER.info("Change broker {} to master success, masterEpoch {}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, syncStateSetEpoch);
});
}
@@ -193,6 +194,7 @@ public class ReplicasManager {
// Change record
this.masterAddress = newMasterAddress;
this.masterEpoch = newMasterEpoch;
+
stopCheckSyncStateSet();
// Change config
@@ -203,6 +205,9 @@ public class ReplicasManager {
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
+ // Notify ha service, change to slave
+ this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
+
this.executorService.submit(() -> {
// Register broker to name-srv
try {
@@ -212,8 +217,6 @@ public class ReplicasManager {
return;
}
- // Notify ha service, change to slave
- this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
LOGGER.info("Change broker {} to slave, newMasterAddress:{}, newMasterEpoch:{}", this.localAddress, newMasterAddress, newMasterEpoch);
});
}
@@ -227,6 +230,8 @@ public class ReplicasManager {
this.syncStateSetEpoch = newSyncStateSetEpoch;
this.syncStateSet = new HashSet<>(newSyncStateSet);
this.haService.setSyncStateSet(newSyncStateSet);
+ } else {
+ LOGGER.info("Sync state set changed failed, newSyncStateSetEpoch is {} and syncStateSetEpoch is {}", newSyncStateSetEpoch, this.syncStateSetEpoch);
}
}
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 5e718f9a9..e4b042dae 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -49,10 +49,9 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
- * The manager that manages the replicas info for all brokers.
- * We can think of this class as the controller's memory state machine
- * It should be noted that this class is not thread safe,
- * and the upper layer needs to ensure that it can be called sequentially
+ * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
+ * state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can
+ * be called sequentially
*/
public class ReplicasInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -131,7 +130,7 @@ public class ReplicasInfoManager {
// Generate event
int epoch = syncStateInfo.getSyncStateSetEpoch() + 1;
response.setNewSyncStateSetEpoch(epoch);
- result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(newSyncStateSet, epoch).encode());
+ result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode());
final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
result.addEvent(event);
return result;
@@ -273,7 +272,7 @@ public class ReplicasInfoManager {
if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
}
- result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
+ result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
return result;
}
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, "Broker metadata is not existed");
@@ -369,8 +368,8 @@ public class ReplicasInfoManager {
final String clusterName = event.getClusterName();
final BrokerInfo brokerInfo = new BrokerInfo(clusterName, brokerName);
brokerInfo.addBroker(newMaster, 1L);
- final SyncStateInfo replicasInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
- this.syncStateSetInfoTable.put(brokerName, replicasInfo);
+ final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
+ this.syncStateSetInfoTable.put(brokerName, syncStateInfo);
this.replicaInfoTable.put(brokerName, brokerInfo);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 5a96b5345..98b4fc937 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -352,6 +352,8 @@ public class CommitLog implements Swappable {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
+
+
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index d8a26e20e..dfd3215d2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -328,7 +328,9 @@ public class AutoSwitchHAConnection implements HAConnection {
slaveRequestOffset = slaveMaxOffset;
}
byteBufferRead.position(readSocketPos);
- maybeExpandInSyncStateSet(slaveMaxOffset);
+ if (!haService.getSyncStateSet().contains(slaveAddress)) {
+ maybeExpandInSyncStateSet(slaveMaxOffset);
+ }
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
break;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 84b047e2d..38758d663 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -100,9 +100,6 @@ public class AutoSwitchHAService extends DefaultHAService {
this.defaultMessageStore.recoverTopicQueueTable();
- final HashSet<String> newSyncStateSet = new HashSet<>();
- newSyncStateSet.add(this.localAddress);
- setSyncStateSet(newSyncStateSet);
LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
return true;
}
@@ -122,6 +119,7 @@ public class AutoSwitchHAService extends DefaultHAService {
this.haClient.setLocalAddress(this.localAddress);
this.haClient.updateSlaveId(slaveId);
this.haClient.updateMasterAddress(newMasterAddr);
+ this.haClient.updateHaMasterAddress(null);
this.haClient.start();
LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 7070db627..dfbc35f81 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,6 +21,8 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -189,6 +191,7 @@ public class AutoSwitchHATest {
public void testAsyncLearnerBrokerRole() throws Exception {
init(defaultMappedFileSize);
((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
@@ -217,6 +220,7 @@ public class AutoSwitchHATest {
init(defaultMappedFileSize, true);
AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
((AutoSwitchHAService) this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet) -> {
System.out.println("Get newSyncStateSet:" + newSyncStateSet);