You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/15 05:50:20 UTC

[rocketmq] branch develop updated: [ISSUE #6346] Support asynchronously notify brokers when their roles has been changed (#6348)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 52c4c0cdf [ISSUE #6346] Support asynchronously notify brokers when their roles has been changed (#6348)
52c4c0cdf is described below

commit 52c4c0cdf61f4b11d06fa70002499c3d79aedc3e
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Wed Mar 15 13:50:06 2023 +0800

    [ISSUE #6346] Support asynchronously notify brokers when their roles has been changed (#6348)
    
    * feat(controller): support asynchronous notify brokers when roles changed
    
    1. support asynchronous notify brokers when roles changed
    
    * refactor(controller): move creating NotifyService instance logic to ControllerManager's constructor method
    
    1. move creating NotifyService instance logic to ControllerManager's
    constructor method
---
 .../broker/controller/ReplicasManager.java         |  2 +-
 .../rocketmq/controller/ControllerManager.java     | 85 +++++++++++++++++++++-
 .../controller => }/ControllerManagerTest.java     |  3 +-
 .../{impl/controller => }/ControllerTestBase.java  |  2 +-
 .../impl => }/DLedgerControllerTest.java           |  9 +--
 .../DefaultBrokerHeartbeatManagerTest.java         |  2 +-
 .../impl => }/manager/ReplicasInfoManagerTest.java |  9 +--
 7 files changed, 96 insertions(+), 16 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 068187e40..d3a1c1fb8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -202,7 +202,7 @@ public class ReplicasManager {
             // The scheduled task for heartbeat sending is not starting now, so we should manually send heartbeat request
             this.sendHeartbeatToController();
             if (this.masterBrokerId != null || brokerElect()) {
-                LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterAddress, this.masterBrokerId);
+                LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterBrokerId, this.masterAddress);
                 this.state = State.RUNNING;
                 this.brokerController.setIsolated(false);
                 LOGGER.info("All register process has been done, change state to: {}", this.state);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 18e9992d3..c4fbf0c8d 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -17,9 +17,13 @@
 package org.apache.rocketmq.controller;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -27,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
@@ -67,6 +72,8 @@ public class ControllerManager {
     private ExecutorService controllerRequestExecutor;
     private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
 
+    private NotifyService notifyService;
+
     public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
         NettyClientConfig nettyClientConfig) {
         this.controllerConfig = controllerConfig;
@@ -77,6 +84,7 @@ public class ControllerManager {
         this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
         this.remotingClient = new NettyRemotingClient(nettyClientConfig);
         this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
+        this.notifyService = new NotifyService();
     }
 
     public boolean initialize() {
@@ -93,6 +101,7 @@ public class ControllerManager {
                 return new FutureTaskExt<T>(runnable, value);
             }
         };
+        this.notifyService.initialize();
         if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
             throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
         }
@@ -164,7 +173,7 @@ public class ControllerManager {
             // Inform all active brokers
             final Map<Long, String> brokerAddrs = memberGroup.getBrokerAddrs();
             brokerAddrs.entrySet().stream().filter(x -> this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey()))
-                    .forEach(x -> doNotifyBrokerRoleChanged(x.getValue(), entry));
+                    .forEach(x -> this.notifyService.notifyBroker(x.getValue(), entry));
         }
     }
 
@@ -214,6 +223,7 @@ public class ControllerManager {
     public void shutdown() {
         this.heartbeatManager.shutdown();
         this.controllerRequestExecutor.shutdown();
+        this.notifyService.shutdown();
         this.controller.shutdown();
         this.remotingClient.shutdown();
     }
@@ -245,4 +255,77 @@ public class ControllerManager {
     public Configuration getConfiguration() {
         return configuration;
     }
+
+    class NotifyService {
+        private ExecutorService executorService;
+
+        private Map<String/*brokerAddress*/, NotifyTask/*currentNotifyTask*/> currentNotifyFutures;
+
+        public NotifyService() {
+        }
+
+        public void initialize() {
+            this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ControllerManager_NotifyService_"));
+            this.currentNotifyFutures = new ConcurrentHashMap<>();
+        }
+
+        public void notifyBroker(String brokerAddress, RoleChangeNotifyEntry entry) {
+            int masterEpoch = entry.getMasterEpoch();
+            NotifyTask oldTask = this.currentNotifyFutures.get(brokerAddress);
+            if (oldTask != null && masterEpoch > oldTask.getMasterEpoch()) {
+                // cancel current future
+                Future oldFuture = oldTask.getFuture();
+                if (oldFuture != null && !oldFuture.isDone()) {
+                    oldFuture.cancel(true);
+                }
+            }
+            final NotifyTask task = new NotifyTask(masterEpoch, null);
+            Runnable runnable = () -> {
+                doNotifyBrokerRoleChanged(brokerAddress, entry);
+                this.currentNotifyFutures.remove(brokerAddress, task);
+            };
+            this.currentNotifyFutures.put(brokerAddress, task);
+            Future<?> future = this.executorService.submit(runnable);
+            task.setFuture(future);
+        }
+
+        public void shutdown() {
+            if (!this.executorService.isShutdown()) {
+                this.executorService.shutdownNow();
+            }
+        }
+
+        class NotifyTask extends Pair<Integer/*epochMaster*/, Future/*notifyFuture*/> {
+            public NotifyTask(Integer masterEpoch, Future future) {
+                super(masterEpoch, future);
+            }
+
+            public Integer getMasterEpoch() {
+                return super.getObject1();
+            }
+
+            public Future getFuture() {
+                return super.getObject2();
+            }
+
+            public void setFuture(Future future) {
+                super.setObject2(future);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hashCode(super.getObject1());
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (this == obj) return true;
+                if (!(obj instanceof NotifyTask)) {
+                    return false;
+                }
+                NotifyTask task = (NotifyTask) obj;
+                return super.getObject1().equals(task.getObject1());
+            }
+        }
+    }
 }
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java
similarity index 99%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java
index b7a4c328e..8ad67d404 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.controller.impl.controller;
+package org.apache.rocketmq.controller;
 
 import java.io.File;
 import java.time.Duration;
@@ -26,7 +26,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.controller.ControllerManager;
 import org.apache.rocketmq.controller.impl.DLedgerController;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java
similarity index 95%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java
rename to controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java
index 9b8fa757c..f77f49dcf 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.controller.impl.controller;
+package org.apache.rocketmq.controller;
 
 public class ControllerTestBase {
 
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
similarity index 97%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
index 3bffad689..eaf78b63d 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.controller.impl.controller.impl;
+package org.apache.rocketmq.controller.impl;
 
 import io.openmessaging.storage.dledger.DLedgerConfig;
 import java.io.File;
@@ -31,7 +31,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
-import org.apache.rocketmq.controller.impl.DLedgerController;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -49,9 +48,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
similarity index 97%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index 74de637dd..b97ea3249 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.controller.impl.controller.impl;
+package org.apache.rocketmq.controller.impl;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
similarity index 98%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
index f677daf28..19411e778 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.controller.impl.controller.impl.manager;
+package org.apache.rocketmq.controller.impl.manager;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -29,7 +29,6 @@ import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManag
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
-import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
@@ -51,9 +50,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;