You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/15 05:50:20 UTC
[rocketmq] branch develop updated: [ISSUE #6346] Support asynchronously notify brokers when their roles has been changed (#6348)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 52c4c0cdf [ISSUE #6346] Support asynchronously notify brokers when their roles has been changed (#6348)
52c4c0cdf is described below
commit 52c4c0cdf61f4b11d06fa70002499c3d79aedc3e
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Wed Mar 15 13:50:06 2023 +0800
[ISSUE #6346] Support asynchronously notify brokers when their roles has been changed (#6348)
* feat(controller): support asynchronous notify brokers when roles changed
1. support asynchronous notify brokers when roles changed
* refactor(controller): move creating NotifyService instance logic to ControllerManager's constructor method
1. move creating NotifyService instance logic to ControllerManager's
constructor method
---
.../broker/controller/ReplicasManager.java | 2 +-
.../rocketmq/controller/ControllerManager.java | 85 +++++++++++++++++++++-
.../controller => }/ControllerManagerTest.java | 3 +-
.../{impl/controller => }/ControllerTestBase.java | 2 +-
.../impl => }/DLedgerControllerTest.java | 9 +--
.../DefaultBrokerHeartbeatManagerTest.java | 2 +-
.../impl => }/manager/ReplicasInfoManagerTest.java | 9 +--
7 files changed, 96 insertions(+), 16 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 068187e40..d3a1c1fb8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -202,7 +202,7 @@ public class ReplicasManager {
// The scheduled task for heartbeat sending is not starting now, so we should manually send heartbeat request
this.sendHeartbeatToController();
if (this.masterBrokerId != null || brokerElect()) {
- LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterAddress, this.masterBrokerId);
+ LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterBrokerId, this.masterAddress);
this.state = State.RUNNING;
this.brokerController.setIsolated(false);
LOGGER.info("All register process has been done, change state to: {}", this.state);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 18e9992d3..c4fbf0c8d 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -17,9 +17,13 @@
package org.apache.rocketmq.controller;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
@@ -27,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
@@ -67,6 +72,8 @@ public class ControllerManager {
private ExecutorService controllerRequestExecutor;
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
+ private NotifyService notifyService;
+
public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this.controllerConfig = controllerConfig;
@@ -77,6 +84,7 @@ public class ControllerManager {
this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
+ this.notifyService = new NotifyService();
}
public boolean initialize() {
@@ -93,6 +101,7 @@ public class ControllerManager {
return new FutureTaskExt<T>(runnable, value);
}
};
+ this.notifyService.initialize();
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
}
@@ -164,7 +173,7 @@ public class ControllerManager {
// Inform all active brokers
final Map<Long, String> brokerAddrs = memberGroup.getBrokerAddrs();
brokerAddrs.entrySet().stream().filter(x -> this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey()))
- .forEach(x -> doNotifyBrokerRoleChanged(x.getValue(), entry));
+ .forEach(x -> this.notifyService.notifyBroker(x.getValue(), entry));
}
}
@@ -214,6 +223,7 @@ public class ControllerManager {
public void shutdown() {
this.heartbeatManager.shutdown();
this.controllerRequestExecutor.shutdown();
+ this.notifyService.shutdown();
this.controller.shutdown();
this.remotingClient.shutdown();
}
@@ -245,4 +255,77 @@ public class ControllerManager {
public Configuration getConfiguration() {
return configuration;
}
+
+ class NotifyService {
+ private ExecutorService executorService;
+
+ private Map<String/*brokerAddress*/, NotifyTask/*currentNotifyTask*/> currentNotifyFutures;
+
+ public NotifyService() {
+ }
+
+ public void initialize() {
+ this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ControllerManager_NotifyService_"));
+ this.currentNotifyFutures = new ConcurrentHashMap<>();
+ }
+
+ public void notifyBroker(String brokerAddress, RoleChangeNotifyEntry entry) {
+ int masterEpoch = entry.getMasterEpoch();
+ NotifyTask oldTask = this.currentNotifyFutures.get(brokerAddress);
+ if (oldTask != null && masterEpoch > oldTask.getMasterEpoch()) {
+ // cancel current future
+ Future oldFuture = oldTask.getFuture();
+ if (oldFuture != null && !oldFuture.isDone()) {
+ oldFuture.cancel(true);
+ }
+ }
+ final NotifyTask task = new NotifyTask(masterEpoch, null);
+ Runnable runnable = () -> {
+ doNotifyBrokerRoleChanged(brokerAddress, entry);
+ this.currentNotifyFutures.remove(brokerAddress, task);
+ };
+ this.currentNotifyFutures.put(brokerAddress, task);
+ Future<?> future = this.executorService.submit(runnable);
+ task.setFuture(future);
+ }
+
+ public void shutdown() {
+ if (!this.executorService.isShutdown()) {
+ this.executorService.shutdownNow();
+ }
+ }
+
+ class NotifyTask extends Pair<Integer/*epochMaster*/, Future/*notifyFuture*/> {
+ public NotifyTask(Integer masterEpoch, Future future) {
+ super(masterEpoch, future);
+ }
+
+ public Integer getMasterEpoch() {
+ return super.getObject1();
+ }
+
+ public Future getFuture() {
+ return super.getObject2();
+ }
+
+ public void setFuture(Future future) {
+ super.setObject2(future);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.getObject1());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof NotifyTask)) {
+ return false;
+ }
+ NotifyTask task = (NotifyTask) obj;
+ return super.getObject1().equals(task.getObject1());
+ }
+ }
+ }
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java
similarity index 99%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java
index b7a4c328e..8ad67d404 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.controller.impl.controller;
+package org.apache.rocketmq.controller;
import java.io.File;
import java.time.Duration;
@@ -26,7 +26,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java
similarity index 95%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java
rename to controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java
index 9b8fa757c..f77f49dcf 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.controller.impl.controller;
+package org.apache.rocketmq.controller;
public class ControllerTestBase {
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
similarity index 97%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
index 3bffad689..eaf78b63d 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.controller.impl.controller.impl;
+package org.apache.rocketmq.controller.impl;
import io.openmessaging.storage.dledger.DLedgerConfig;
import java.io.File;
@@ -31,7 +31,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
-import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -49,9 +48,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
similarity index 97%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index 74de637dd..b97ea3249 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.controller.impl.controller.impl;
+package org.apache.rocketmq.controller.impl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
similarity index 98%
rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
index f677daf28..19411e778 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.controller.impl.controller.impl.manager;
+package org.apache.rocketmq.controller.impl.manager;
import java.util.Arrays;
import java.util.HashSet;
@@ -29,7 +29,6 @@ import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManag
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
import org.apache.rocketmq.controller.impl.event.EventMessage;
-import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
@@ -51,9 +50,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
-import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;