You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/06 09:01:10 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: Fix bug that do not remove caughtUpTime in connectionCaughtUpTimeTable
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new a3670944e Fix bug that do not remove caughtUpTime in connectionCaughtUpTimeTable
a3670944e is described below
commit a3670944e19f5c7d193c73fa7343968da4feda5e
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Mon Jun 6 17:00:54 2022 +0800
Fix bug that do not remove caughtUpTime in connectionCaughtUpTimeTable
---
.../rocketmq/broker/hacontroller/ReplicasManager.java | 9 ++++-----
.../java/org/apache/rocketmq/common/BrokerConfig.java | 6 +-----
.../apache/rocketmq/controller/ControllerManager.java | 2 +-
.../apache/rocketmq/namesrv/NamesrvController.java | 3 +++
.../store/ha/autoswitch/AutoSwitchHAService.java | 19 +++++++++----------
.../test/autoswitchrole/AutoSwitchRoleBase.java | 4 ++--
.../autoswitchrole/AutoSwitchRoleIntegrationTest.java | 4 ++--
7 files changed, 22 insertions(+), 25 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index a5c141938..a25924e84 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -206,10 +206,7 @@ public class ReplicasManager {
handleSlaveSynchronize(BrokerRole.SLAVE);
// Notify ha service, change to slave
- if (!this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId())) {
- LOGGER.info("Failed to change ha role to slave");
- return;
- }
+ this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
this.executorService.submit(() -> {
// Register broker to name-srv
@@ -358,6 +355,9 @@ public class ReplicasManager {
* Scheduling check syncStateSet.
*/
private void schedulingCheckSyncStateSet() {
+ if (this.checkSyncStateSetTaskFuture != null) {
+ this.checkSyncStateSetTaskFuture.cancel(false);
+ }
this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(() -> {
final Set<String> newSyncStateSet = this.haService.maybeShrinkInSyncStateSet();
newSyncStateSet.add(this.localAddress);
@@ -388,7 +388,6 @@ public class ReplicasManager {
private void stopCheckSyncStateSet() {
if (this.checkSyncStateSetTaskFuture != null) {
this.checkSyncStateSetTaskFuture.cancel(false);
- this.checkSyncStateSetTaskFuture = null;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 10072249d..5bf85f86b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -305,15 +305,11 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean controllerDeployedStandAlone = false;
- /**
- * If isControllerDeployedStandAlone = false, controllerAddr should be the addresses of name-srv which running the controller instance.
- * If isControllerDeployedStandAlone = true, controllerAddr should be controller's address.
- */
private String controllerAddr = "";
private long replicasManagerSyncBrokerMetadataPeriod = 5 * 1000;
- private long replicasManagerCheckSyncStateSetPeriod = 8 * 1000;
+ private long replicasManagerCheckSyncStateSetPeriod = 5 * 1000;
private long replicasManagerSyncControllerMetadataPeriod = 10 * 1000;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 51e3ccb87..01f87903d 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -135,8 +135,8 @@ public class ControllerManager {
public void shutdown() {
this.heartbeatManager.shutdown();
- this.controller.shutdown();
this.controllerRequestExecutor.shutdown();
+ this.controller.shutdown();
}
public BrokerHeartbeatManager getHeartbeatManager() {
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 0659e5e2f..08da5d0a4 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -311,6 +311,9 @@ public class NamesrvController {
if (this.controllerRequestExecutor != null) {
this.controllerRequestExecutor.shutdown();
}
+ if (this.controllerConfig.isEnableStartupController()) {
+ this.controller.shutdown();
+ }
this.scheduledExecutorService.shutdown();
this.scanExecutorService.shutdown();
this.routeInfoManager.shutdown();
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index e1446b3f7..3c908393d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -80,6 +80,7 @@ public class AutoSwitchHAService extends DefaultHAService {
@Override public boolean changeToMaster(int masterEpoch) {
final int lastEpoch = this.epochCache.lastEpoch();
if (masterEpoch < lastEpoch) {
+ LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to master", masterEpoch, lastEpoch);
return false;
}
destroyConnections();
@@ -110,6 +111,7 @@ public class AutoSwitchHAService extends DefaultHAService {
@Override public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slaveId) {
final int lastEpoch = this.epochCache.lastEpoch();
if (newMasterEpoch < lastEpoch) {
+ LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to slave", newMasterEpoch, lastEpoch);
return false;
}
try {
@@ -158,20 +160,18 @@ public class AutoSwitchHAService extends DefaultHAService {
}
/**
- * Check and maybe shrink the inSyncStateSet.
- * A slave will be removed from inSyncStateSet if (curTime - HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
+ * Check and maybe shrink the inSyncStateSet. A slave will be removed from inSyncStateSet if (curTime -
+ * HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
*/
public Set<String> maybeShrinkInSyncStateSet() {
- final Set<String> currentSyncStateSet = getSyncStateSet();
- final HashSet<String> newSyncStateSet = new HashSet<>(currentSyncStateSet);
+ final Set<String> newSyncStateSet = getSyncStateSet();
final long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
- for (Map.Entry<String, Long> next: this.connectionCaughtUpTimeTable.entrySet()) {
+ for (Map.Entry<String, Long> next : this.connectionCaughtUpTimeTable.entrySet()) {
final String slaveAddress = next.getKey();
- if (currentSyncStateSet.contains(slaveAddress)) {
+ if (newSyncStateSet.contains(slaveAddress)) {
final Long lastCaughtUpTimeMs = this.connectionCaughtUpTimeTable.get(slaveAddress);
if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchup) {
newSyncStateSet.remove(slaveAddress);
- this.connectionCaughtUpTimeTable.remove(slaveAddress);
}
}
}
@@ -179,9 +179,8 @@ public class AutoSwitchHAService extends DefaultHAService {
}
/**
- * Check and maybe add the slave to inSyncStateSet.
- * A slave will be added to inSyncStateSet if its slaveMaxOffset >= current confirmOffset, and it is caught up to
- * an offset within the current leader epoch.
+ * Check and maybe add the slave to inSyncStateSet. A slave will be added to inSyncStateSet if its slaveMaxOffset >=
+ * current confirmOffset, and it is caught up to an offset within the current leader epoch.
*/
public void maybeExpandInSyncStateSet(final String slaveAddress, final long slaveMaxOffset) {
final Set<String> currentSyncStateSet = getSyncStateSet();
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index c7b515584..d9968b610 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -76,13 +76,13 @@ public class AutoSwitchRoleBase {
public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort, int brokerListenPort,
int nettyListenPort, BrokerRole expectedRole, int mappedFileSize) throws Exception {
final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + brokerId, haPort, mappedFileSize);
- storeConfig.setHaMaxTimeSlaveNotCatchup(4 * 1000);
+ storeConfig.setHaMaxTimeSlaveNotCatchup(3 * 1000);
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setListenPort(brokerListenPort);
brokerConfig.setNamesrvAddr(namesrvAddress);
brokerConfig.setControllerAddr(controllerAddress);
brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
- brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(3 * 1000);
+ brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(2 * 1000);
brokerConfig.setEnableControllerMode(true);
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 1d6b2611d..838eb7349 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -109,10 +109,10 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
assertEquals(2, syncStateSet.getSyncStateSet().size());
- // Shut controller2
+ // Shutdown controller2
this.brokerController2.shutdown();
- Thread.sleep(5000);
+ Thread.sleep(8000);
syncStateSet = replicasManager.getSyncStateSet();
assertEquals(1, syncStateSet.getSyncStateSet().size());
}