You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/06 09:01:10 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: Fix bug that do not remove caughtUpTime in connectionCaughtUpTimeTable

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new a3670944e Fix bug that do not remove caughtUpTime in connectionCaughtUpTimeTable
a3670944e is described below

commit a3670944e19f5c7d193c73fa7343968da4feda5e
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Mon Jun 6 17:00:54 2022 +0800

    Fix bug that do not remove caughtUpTime in connectionCaughtUpTimeTable
---
 .../rocketmq/broker/hacontroller/ReplicasManager.java |  9 ++++-----
 .../java/org/apache/rocketmq/common/BrokerConfig.java |  6 +-----
 .../apache/rocketmq/controller/ControllerManager.java |  2 +-
 .../apache/rocketmq/namesrv/NamesrvController.java    |  3 +++
 .../store/ha/autoswitch/AutoSwitchHAService.java      | 19 +++++++++----------
 .../test/autoswitchrole/AutoSwitchRoleBase.java       |  4 ++--
 .../autoswitchrole/AutoSwitchRoleIntegrationTest.java |  4 ++--
 7 files changed, 22 insertions(+), 25 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index a5c141938..a25924e84 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -206,10 +206,7 @@ public class ReplicasManager {
                 handleSlaveSynchronize(BrokerRole.SLAVE);
 
                 // Notify ha service, change to slave
-                if (!this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId())) {
-                    LOGGER.info("Failed to change ha role to slave");
-                    return;
-                }
+                this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
 
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
@@ -358,6 +355,9 @@ public class ReplicasManager {
      * Scheduling check syncStateSet.
      */
     private void schedulingCheckSyncStateSet() {
+        if (this.checkSyncStateSetTaskFuture != null) {
+            this.checkSyncStateSetTaskFuture.cancel(false);
+        }
         this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(() -> {
             final Set<String> newSyncStateSet = this.haService.maybeShrinkInSyncStateSet();
             newSyncStateSet.add(this.localAddress);
@@ -388,7 +388,6 @@ public class ReplicasManager {
     private void stopCheckSyncStateSet() {
         if (this.checkSyncStateSetTaskFuture != null) {
             this.checkSyncStateSetTaskFuture.cancel(false);
-            this.checkSyncStateSetTaskFuture = null;
         }
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 10072249d..5bf85f86b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -305,15 +305,11 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private boolean controllerDeployedStandAlone = false;
 
-    /**
-     * If isControllerDeployedStandAlone = false, controllerAddr should be the addresses of name-srv which running the controller instance.
-     * If isControllerDeployedStandAlone = true, controllerAddr should be controller's address.
-     */
     private String controllerAddr = "";
 
     private long replicasManagerSyncBrokerMetadataPeriod = 5 * 1000;
 
-    private long replicasManagerCheckSyncStateSetPeriod = 8 * 1000;
+    private long replicasManagerCheckSyncStateSetPeriod = 5 * 1000;
 
     private long replicasManagerSyncControllerMetadataPeriod = 10 * 1000;
 
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 51e3ccb87..01f87903d 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -135,8 +135,8 @@ public class ControllerManager {
 
     public void shutdown() {
         this.heartbeatManager.shutdown();
-        this.controller.shutdown();
         this.controllerRequestExecutor.shutdown();
+        this.controller.shutdown();
     }
 
     public BrokerHeartbeatManager getHeartbeatManager() {
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 0659e5e2f..08da5d0a4 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -311,6 +311,9 @@ public class NamesrvController {
         if (this.controllerRequestExecutor != null) {
             this.controllerRequestExecutor.shutdown();
         }
+        if (this.controllerConfig.isEnableStartupController()) {
+            this.controller.shutdown();
+        }
         this.scheduledExecutorService.shutdown();
         this.scanExecutorService.shutdown();
         this.routeInfoManager.shutdown();
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index e1446b3f7..3c908393d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -80,6 +80,7 @@ public class AutoSwitchHAService extends DefaultHAService {
     @Override public boolean changeToMaster(int masterEpoch) {
         final int lastEpoch = this.epochCache.lastEpoch();
         if (masterEpoch < lastEpoch) {
+            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to master", masterEpoch, lastEpoch);
             return false;
         }
         destroyConnections();
@@ -110,6 +111,7 @@ public class AutoSwitchHAService extends DefaultHAService {
     @Override public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slaveId) {
         final int lastEpoch = this.epochCache.lastEpoch();
         if (newMasterEpoch < lastEpoch) {
+            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to slave", newMasterEpoch, lastEpoch);
             return false;
         }
         try {
@@ -158,20 +160,18 @@ public class AutoSwitchHAService extends DefaultHAService {
     }
 
     /**
-     * Check and maybe shrink the inSyncStateSet.
-     * A slave will be removed from inSyncStateSet if (curTime - HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
+     * Check and maybe shrink the inSyncStateSet. A slave will be removed from inSyncStateSet if (curTime -
+     * HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
      */
     public Set<String> maybeShrinkInSyncStateSet() {
-        final Set<String> currentSyncStateSet = getSyncStateSet();
-        final HashSet<String> newSyncStateSet = new HashSet<>(currentSyncStateSet);
+        final Set<String> newSyncStateSet = getSyncStateSet();
         final long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
-        for (Map.Entry<String, Long> next: this.connectionCaughtUpTimeTable.entrySet()) {
+        for (Map.Entry<String, Long> next : this.connectionCaughtUpTimeTable.entrySet()) {
             final String slaveAddress = next.getKey();
-            if (currentSyncStateSet.contains(slaveAddress)) {
+            if (newSyncStateSet.contains(slaveAddress)) {
                 final Long lastCaughtUpTimeMs = this.connectionCaughtUpTimeTable.get(slaveAddress);
                 if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchup) {
                     newSyncStateSet.remove(slaveAddress);
-                    this.connectionCaughtUpTimeTable.remove(slaveAddress);
                 }
             }
         }
@@ -179,9 +179,8 @@ public class AutoSwitchHAService extends DefaultHAService {
     }
 
     /**
-     * Check and maybe add the slave to inSyncStateSet.
-     * A slave will be added to inSyncStateSet if its slaveMaxOffset >= current confirmOffset, and it is caught up to
-     * an offset within the current leader epoch.
+     * Check and maybe add the slave to inSyncStateSet. A slave will be added to inSyncStateSet if its slaveMaxOffset >=
+     * current confirmOffset, and it is caught up to an offset within the current leader epoch.
      */
     public void maybeExpandInSyncStateSet(final String slaveAddress, final long slaveMaxOffset) {
         final Set<String> currentSyncStateSet = getSyncStateSet();
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index c7b515584..d9968b610 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -76,13 +76,13 @@ public class AutoSwitchRoleBase {
     public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort, int brokerListenPort,
         int nettyListenPort, BrokerRole expectedRole, int mappedFileSize) throws Exception {
         final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + brokerId, haPort, mappedFileSize);
-        storeConfig.setHaMaxTimeSlaveNotCatchup(4 * 1000);
+        storeConfig.setHaMaxTimeSlaveNotCatchup(3 * 1000);
         final BrokerConfig brokerConfig = new BrokerConfig();
         brokerConfig.setListenPort(brokerListenPort);
         brokerConfig.setNamesrvAddr(namesrvAddress);
         brokerConfig.setControllerAddr(controllerAddress);
         brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
-        brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(3 * 1000);
+        brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(2 * 1000);
         brokerConfig.setEnableControllerMode(true);
 
         final NettyServerConfig nettyServerConfig = new NettyServerConfig();
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 1d6b2611d..838eb7349 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -109,10 +109,10 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
         assertEquals(2, syncStateSet.getSyncStateSet().size());
 
-        // Shut controller2
+        // Shutdown controller2
         this.brokerController2.shutdown();
 
-        Thread.sleep(5000);
+        Thread.sleep(8000);
         syncStateSet = replicasManager.getSyncStateSet();
         assertEquals(1, syncStateSet.getSyncStateSet().size());
     }