You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/05 07:23:46 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Reuse dledger remotingServer in controller mode. (#4409)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 8515a8d09 [Summer of code] Reuse dledger remotingServer in controller mode. (#4409)
8515a8d09 is described below
commit 8515a8d0915d1d2e2e41a0fa3ea32ba6efc1595f
Author: hzh0425 <64...@qq.com>
AuthorDate: Sun Jun 5 15:23:25 2022 +0800
[Summer of code] Reuse dledger remotingServer in controller mode. (#4409)
* reuse dledger remotingServer in Controller
* reuse dledger remotingServer in Namesrv when startup controller
* fix some bugs in controller
* trigger ci
* code review
* fix controllerManagerTest bug
---
.../controller/BrokerHeartbeatManager.java | 7 +++-
.../org/apache/rocketmq/controller/Controller.java | 6 +++
.../rocketmq/controller/ControllerManager.java | 43 ++++++++++++++--------
.../controller/impl/DLedgerController.java | 31 ++++++++++++----
.../impl/DefaultBrokerHeartbeatManager.java | 42 ++++++++++++++-------
.../impl/manager/ReplicasInfoManager.java | 9 +++++
.../processor/ControllerRequestProcessor.java | 1 +
.../impl/controller/ControllerManagerTest.java | 31 +++++++++-------
.../impl/DefaultBrokerHeartbeatManagerTest.java | 2 +-
.../apache/rocketmq/namesrv/NamesrvController.java | 23 ++++++++----
.../processor/ControllerRequestProcessor.java | 13 +------
pom.xml | 2 +-
.../store/ha/autoswitch/AutoSwitchHAService.java | 10 -----
.../test/autoswitchrole/AutoSwitchRoleBase.java | 4 +-
.../AutoSwitchRoleIntegrationTest.java | 22 ++++++-----
15 files changed, 150 insertions(+), 96 deletions(-)
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index f86dbf558..e2ff58301 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -25,6 +25,11 @@ public interface BrokerHeartbeatManager {
*/
void onBrokerHeartbeat(final String clusterName, final String brokerAddr);
+ /**
+ * Change the metadata(brokerId ..) for a broker.
+ */
+ void changeBrokerMetadata(final String clusterName, final String brokerAddr, final Long brokerId);
+
/**
* Start heartbeat manager.
*/
@@ -60,6 +65,6 @@ public interface BrokerHeartbeatManager {
/**
* Trigger when broker inactive.
*/
- void onBrokerInactive(final String brokerName, final String brokerAddress, final long brokerId);
+ void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress, final long brokerId);
}
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index b4ed69a40..cbd11f591 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncSt
import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/**
@@ -100,4 +101,9 @@ public interface Controller {
* @return RemotingCommand(GetControllerMetadataResponseHeader)
*/
RemotingCommand getControllerMetadata();
+
+ /**
+ * Get the remotingServer used by the controller, the upper layer will reuse this remotingServer.
+ */
+ RemotingServer getRemotingServer();
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 2a3a6b578..51e3ccb87 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -23,12 +23,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.controller.impl.DLedgerController;
@@ -38,7 +40,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -52,8 +53,6 @@ public class ControllerManager {
private final Configuration configuration;
private Controller controller;
private BrokerHeartbeatManager heartbeatManager;
- private RemotingServer remotingServer;
-
private ExecutorService controllerRequestExecutor;
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
@@ -84,15 +83,14 @@ public class ControllerManager {
return new FutureTaskExt<T>(runnable, value);
}
};
- this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
-
this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
- this.controller = new DLedgerController(this.controllerConfig, (cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster, brokerAddr));
+ this.controller = new DLedgerController(this.controllerConfig, (cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster, brokerAddr),
+ this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService);
// Register broker inactive listener
this.heartbeatManager.addBrokerLifecycleListener(new BrokerHeartbeatManager.BrokerLifecycleListener() {
@Override
- public void onBrokerInactive(String brokerName, String brokerAddress, long brokerId) {
+ public void onBrokerInactive(String clusterName, String brokerName, String brokerAddress, long brokerId) {
if (brokerId == MixAll.MASTER_ID) {
if (controller.isLeaderState()) {
final CompletableFuture<RemotingCommand> future = controller.electMaster(new ElectMasterRequestHeader(brokerName));
@@ -100,7 +98,10 @@ public class ControllerManager {
final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
if (responseHeader != null) {
- log.info("Broker{}'s master shutdown, elect a new master:{}", brokerName, responseHeader);
+ log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
+ if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
+ heartbeatManager.changeBrokerMetadata(clusterName, responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
+ }
}
} catch (Exception ignored) {
}
@@ -110,23 +111,29 @@ public class ControllerManager {
}
}
});
- this.registerProcessor();
+ registerProcessor();
return true;
}
public void registerProcessor() {
final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
- this.remotingServer.registerDefaultProcessor(controllerRequestProcessor, this.controllerRequestExecutor);
+ final RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
+ assert controllerRemotingServer != null;
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.BROKER_HEARTBEAT, controllerRequestProcessor, this.controllerRequestExecutor);
}
public void start() {
- this.remotingServer.start();
this.heartbeatManager.start();
this.controller.startup();
}
public void shutdown() {
- this.remotingServer.shutdown();
this.heartbeatManager.shutdown();
this.controller.shutdown();
this.controllerRequestExecutor.shutdown();
@@ -144,14 +151,18 @@ public class ControllerManager {
return controller;
}
- public RemotingServer getRemotingServer() {
- return remotingServer;
- }
-
public NettyServerConfig getNettyServerConfig() {
return nettyServerConfig;
}
+ public NettyClientConfig getNettyClientConfig() {
+ return nettyClientConfig;
+ }
+
+ public BrokerHousekeepingService getBrokerHousekeepingService() {
+ return brokerHousekeepingService;
+ }
+
public Configuration getConfiguration() {
return configuration;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 42c3d34f6..3823b9425 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -53,7 +53,11 @@ import org.apache.rocketmq.controller.impl.event.EventSerializer;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/**
@@ -75,24 +79,30 @@ public class DLedgerController implements Controller {
private volatile boolean isScheduling = false;
public DLedgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) {
- this.controllerConfig = config;
+ this(config, brokerAlivePredicate, null, null, null);
+ }
+
+ public DLedgerController(final ControllerConfig controllerConfig,
+ final BiPredicate<String, String> brokerAlivePredicate, final NettyServerConfig nettyServerConfig,
+ final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener) {
+ this.controllerConfig = controllerConfig;
this.eventSerializer = new EventSerializer();
this.scheduler = new EventScheduler();
this.brokerAlivePredicate = brokerAlivePredicate;
this.dLedgerConfig = new DLedgerConfig();
- this.dLedgerConfig.setGroup(config.getControllerDLegerGroup());
- this.dLedgerConfig.setPeers(config.getControllerDLegerPeers());
- this.dLedgerConfig.setSelfId(config.getControllerDLegerSelfId());
- this.dLedgerConfig.setStoreBaseDir(config.getControllerStorePath());
- this.dLedgerConfig.setMappedFileSizeForEntryData(config.getMappedFileSize());
+ this.dLedgerConfig.setGroup(controllerConfig.getControllerDLegerGroup());
+ this.dLedgerConfig.setPeers(controllerConfig.getControllerDLegerPeers());
+ this.dLedgerConfig.setSelfId(controllerConfig.getControllerDLegerSelfId());
+ this.dLedgerConfig.setStoreBaseDir(controllerConfig.getControllerStorePath());
+ this.dLedgerConfig.setMappedFileSizeForEntryData(controllerConfig.getMappedFileSize());
this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
- this.replicasInfoManager = new ReplicasInfoManager(config);
+ this.replicasInfoManager = new ReplicasInfoManager(controllerConfig);
this.statemachine = new DLedgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
// Register statemachine and role handler.
- this.dLedgerServer = new DLedgerServer(dLedgerConfig);
+ this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
this.dLedgerServer.registerStateMachine(this.statemachine);
this.dLedgerServer.getdLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
}
@@ -175,6 +185,11 @@ public class DLedgerController implements Controller {
state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), state.isLeader(), sb.toString()));
}
+ @Override
+ public RemotingServer getRemotingServer() {
+ return this.dLedgerServer.getRemotingServer();
+ }
+
/**
* Append the request to dledger, wait the dledger to commit the request.
*/
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 7c4b08149..050f0aa6f 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -39,7 +39,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
private static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 10;
private final ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
- private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
+ private final ExecutorService executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
private final ControllerConfig controllerConfig;
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
@@ -72,13 +72,13 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
long timeoutMillis = next.getValue().heartbeatTimeoutMillis;
if ((last + timeoutMillis) < System.currentTimeMillis()) {
final Channel channel = next.getValue().channel;
+ iterator.remove();
if (channel != null) {
RemotingUtil.closeChannel(channel);
}
- iterator.remove();
this.executor.submit(() ->
- notifyBrokerInActive(next.getValue().brokerName, next.getKey().getBrokerAddr(), next.getValue().brokerId));
- log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
+ notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().brokerName, next.getKey().getBrokerAddr(), next.getValue().brokerId));
+ log.warn("The broker channel {} expired, brokerInfo {}, expired {}ms", next.getValue().channel, next.getKey(), timeoutMillis);
}
}
} catch (Exception e) {
@@ -86,9 +86,9 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
}
}
- private void notifyBrokerInActive(String brokerName, String brokerAddr, Long brokerId) {
+ private void notifyBrokerInActive(String clusterName, String brokerName, String brokerAddr, Long brokerId) {
for (BrokerLifecycleListener listener : this.brokerLifecycleListeners) {
- listener.onBrokerInactive(brokerName, brokerAddr, brokerId);
+ listener.onBrokerInactive(clusterName, brokerName, brokerAddr, brokerId);
}
}
@@ -112,6 +112,16 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
}
}
+ @Override
+ public void changeBrokerMetadata(String clusterName, String brokerAddr, Long brokerId) {
+ BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
+ BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
+ if (prev != null) {
+ prev.brokerId = brokerId;
+ log.info("Change broker {}'s brokerId to {}", brokerAddr, brokerId);
+ }
+ }
+
@Override
public void onBrokerHeartbeat(String clusterName, String brokerAddr) {
BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
@@ -123,15 +133,19 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
@Override
public void onBrokerChannelClose(Channel channel) {
- synchronized (this) {
- for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) {
- if (entry.getValue().channel == channel) {
- this.executor.submit(() ->
- notifyBrokerInActive(entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId));
- break;
- }
+ BrokerAddrInfo addrInfo = null;
+ for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) {
+ if (entry.getValue().channel == channel) {
+ log.info("Channel {} inactive, broker {}, addr:{}, id:{}", entry.getValue().channel, entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId);
+ addrInfo = entry.getKey();
+ this.executor.submit(() ->
+ notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId));
+ break;
}
}
+ if (addrInfo != null) {
+ this.brokerLiveTable.remove(addrInfo);
+ }
}
@Override
@@ -147,9 +161,9 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
static class BrokerLiveInfo {
private final String brokerName;
- private final long brokerId;
private final long heartbeatTimeoutMillis;
private final Channel channel;
+ private long brokerId;
private long lastUpdateTimestamp;
public BrokerLiveInfo(String brokerName, long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index e4b042dae..f0b680f73 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -77,6 +77,15 @@ public class ReplicasInfoManager {
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ // Check whether the oldSyncStateSet is equal with newSyncStateSet
+ final Set<String> oldSyncStateSet = syncStateInfo.getSyncStateSet();
+ if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) {
+ String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet";
+ log.warn("{}", err);
+ result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+ return result;
+ }
+
// Check master
if (!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) {
String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}",
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 164e9e701..a5ebe37e0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -115,6 +115,7 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
case BROKER_HEARTBEAT: {
final BrokerHeartbeatRequestHeader requestHeader = request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerAddr());
+ return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
}
case CONTROLLER_GET_SYNC_STATE_DATA: {
if (request.getBody() != null) {
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 2b8b5e976..80f3a409a 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaI
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.controller.impl.DLedgerController;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -51,8 +51,9 @@ public class ControllerManagerTest {
private List<String> baseDirs;
private List<ControllerManager> controllers;
private NettyRemotingClient remotingClient;
+ private NettyRemotingClient remotingClient1;
- public ControllerManager launchManager(final String group, final String peers, final String selfId, final int listenPort) {
+ public ControllerManager launchManager(final String group, final String peers, final String selfId) {
final String path = "/tmp" + File.separator + group + File.separator + selfId;
baseDirs.add(path);
@@ -66,7 +67,6 @@ public class ControllerManagerTest {
config.setScanNotActiveBrokerInterval(2000L);
final NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(listenPort);
final ControllerManager manager = new ControllerManager(config, serverConfig, new NettyClientConfig());
manager.initialize();
@@ -81,6 +81,8 @@ public class ControllerManagerTest {
this.controllers = new ArrayList<>();
this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
this.remotingClient.start();
+ this.remotingClient1 = new NettyRemotingClient(new NettyClientConfig());
+ this.remotingClient1.start();
}
public ControllerManager waitLeader(final List<ControllerManager> controllers) throws Exception {
@@ -105,9 +107,9 @@ public class ControllerManagerTest {
public void mockData() {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
- launchManager(group, peers, "n0", 31000);
- launchManager(group, peers, "n1", 31001);
- launchManager(group, peers, "n2", 31002);
+ launchManager(group, peers, "n0");
+ launchManager(group, peers, "n1");
+ launchManager(group, peers, "n2");
}
/**
@@ -115,13 +117,13 @@ public class ControllerManagerTest {
*/
public BrokerRegisterResponseHeader registerBroker(
final String controllerAddress, final String clusterName,
- final String brokerName, final String address) throws Exception {
+ final String brokerName, final String address, final RemotingClient client) throws Exception {
final BrokerRegisterRequestHeader requestHeader = new BrokerRegisterRequestHeader(clusterName, brokerName, address);
// Timeout = 3000
requestHeader.setHeartbeatTimeoutMillis(4000L);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
- final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+ final RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
assert response != null;
switch (response.getCode()) {
case SUCCESS: {
@@ -138,14 +140,14 @@ public class ControllerManagerTest {
public void testSomeApi() throws Exception {
mockData();
final ControllerManager leader = waitLeader(this.controllers);
- String leaderAddr = RemotingUtil.getLocalAddress() + ":" + leader.getNettyServerConfig().getListenPort();
+ String leaderAddr = "localhost" + ":" + leader.getController().getRemotingServer().localListenPort();
// Register two broker, the first one is master.
- final BrokerRegisterResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000");
+ final BrokerRegisterResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient);
assert responseHeader1 != null;
assertEquals(responseHeader1.getBrokerId(), MixAll.MASTER_ID);
- final BrokerRegisterResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001");
+ final BrokerRegisterResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1);
assert responseHeader2 != null;
assertEquals(responseHeader2.getBrokerId(), 2);
@@ -157,8 +159,9 @@ public class ControllerManagerTest {
heartbeatRequestHeader.setBrokerName("broker1");
heartbeatRequestHeader.setBrokerAddr("127.0.0.1:8001");
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, heartbeatRequestHeader);
+ System.out.println("send heartbeat success");
try {
- this.remotingClient.invokeOneway(leaderAddr, request, 3000);
+ final RemotingCommand remotingCommand = this.remotingClient1.invokeSync(leaderAddr, request, 3000);
} catch (Exception e) {
e.printStackTrace();
}
@@ -171,7 +174,7 @@ public class ControllerManagerTest {
// The new master should be broker2.
final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader("broker1");
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
- final RemotingCommand response = this.remotingClient.invokeSync(leaderAddr, request, 3000);
+ final RemotingCommand response = this.remotingClient1.invokeSync(leaderAddr, request, 3000);
final GetReplicaInfoResponseHeader responseHeader = response.decodeCommandCustomHeader(GetReplicaInfoResponseHeader.class);
assertEquals(responseHeader.getMasterAddress(), "127.0.0.1:8001");
@@ -187,5 +190,7 @@ public class ControllerManagerTest {
System.out.println("Delete file " + dir);
new File(dir).delete();
}
+ this.remotingClient.shutdown();
+ this.remotingClient1.shutdown();
}
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index d4de7abe4..8ec99a1e0 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -40,7 +40,7 @@ public class DefaultBrokerHeartbeatManagerTest {
@Test
public void testDetectBrokerAlive() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
- this.heartbeatManager.addBrokerLifecycleListener((brokerName, brokerAddress, brokerId) -> {
+ this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> {
System.out.println("Broker shutdown:" + brokerAddress);
latch.countDown();
});
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 00f9215e2..0659e5e2f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -111,8 +111,13 @@ public class NamesrvController {
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
if (controllerConfig.isEnableStartupController()) {
- this.controller = new DLedgerController(controllerConfig, this.routeInfoManager::isBrokerAlive);
- this.routeInfoManager.setController(this.controller);
+ try {
+ final NettyServerConfig controllerNettyServerConfig = (NettyServerConfig) nettyServerConfig.clone();
+ this.controller = new DLedgerController(controllerConfig, this.routeInfoManager::isBrokerAlive,
+ controllerNettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService);
+ this.routeInfoManager.setController(this.controller);
+ } catch (final CloneNotSupportedException ignored) {
+ }
}
this.configuration = new Configuration(
LOGGER,
@@ -270,13 +275,15 @@ public class NamesrvController {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
if (controllerConfig.isEnableStartupController()) {
+ final RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
+ assert controllerRemotingServer != null;
final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
- this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
- this.remotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
- this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
- this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
- this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+ controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
}
}
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
index 2651636e3..be00134b4 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
@@ -20,14 +20,12 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.logging.InternalLogger;
@@ -102,16 +100,7 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
break;
}
case CONTROLLER_GET_METADATA_INFO: {
- final RemotingCommand response = this.controller.getControllerMetadata();
- final GetMetaDataResponseHeader responseHeader = (GetMetaDataResponseHeader) response.readCustomHeader();
- if (StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) {
- final String leaderAddress = responseHeader.getControllerLeaderAddress();
- // Because the controller is proxy by namesrv, so we should replace the controllerAddress to namesrvAddress.
- final int splitIndex = StringUtils.lastIndexOf(leaderAddress, ":");
- final String namesrvAddress = leaderAddress.substring(0, splitIndex + 1) + this.namesrvController.getNettyServerConfig().getListenPort();
- responseHeader.setControllerLeaderAddress(namesrvAddress);
- }
- return response;
+ return this.controller.getControllerMetadata();
}
case CONTROLLER_GET_SYNC_STATE_DATA: {
if (request.getBody() != null) {
diff --git a/pom.xml b/pom.xml
index 9c72fffb0..7505ce295 100644
--- a/pom.xml
+++ b/pom.xml
@@ -537,7 +537,7 @@
<dependency>
<groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId>
- <version>0.2.5</version>
+ <version>0.2.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 38758d663..de321e85a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -142,16 +142,6 @@ public class AutoSwitchHAService extends DefaultHAService {
@Override public void updateMasterAddress(String newAddr) {
}
- @Override public void removeConnection(HAConnection conn) {
- final Set<String> syncStateSet = getSyncStateSet();
- String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
- if (syncStateSet.contains(slave)) {
- syncStateSet.remove(slave);
- notifySyncStateSetChanged(syncStateSet);
- }
- super.removeConnection(conn);
- }
-
public void registerSyncStateSetChangedListener(final Consumer<Set<String>> listener) {
this.syncStateSetChangedListeners.add(listener);
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 38172ca3d..ea1c18bcd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -73,13 +73,13 @@ public class AutoSwitchRoleBase {
return PORT_COUNTER.addAndGet(10 + random.nextInt(10));
}
- public BrokerController startBroker(String namesrvAddress, int brokerId, int haPort, int brokerListenPort,
+ public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort, int brokerListenPort,
int nettyListenPort, BrokerRole expectedRole, int mappedFileSize) throws Exception {
final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + brokerId, haPort, mappedFileSize);
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setListenPort(brokerListenPort);
brokerConfig.setNamesrvAddr(namesrvAddress);
- brokerConfig.setControllerAddr(namesrvAddress);
+ brokerConfig.setControllerAddr(controllerAddress);
brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(4 * 1000);
brokerConfig.setEnableControllerMode(true);
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index bdf23ecbd..1d4026018 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -47,6 +47,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
private ControllerConfig controllerConfig;
private NamesrvController namesrvController;
private String namesrvAddress;
+ private String controllerAddress;
private BrokerController brokerController1;
private BrokerController brokerController2;
@@ -54,20 +55,22 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
super.initialize();
// Startup namesrv
- final String peers = String.format("n0-localhost:%d", nextPort());
+ int controllerPort = nextPort();
+ final String peers = String.format("n0-localhost:%d", controllerPort);
+
final NettyServerConfig serverConfig = new NettyServerConfig();
int namesrvPort = nextPort();
serverConfig.setListenPort(namesrvPort);
- System.out.println(namesrvPort);
this.controllerConfig = buildControllerConfig("n0", peers);
this.namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig(), controllerConfig);
assertTrue(namesrvController.initialize());
namesrvController.start();
this.namesrvAddress = "127.0.0.1:" + namesrvPort + ";";
+ this.controllerAddress = "127.0.0.1:" + controllerPort + ";";
- this.brokerController1 = startBroker(this.namesrvAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
- this.brokerController2 = startBroker(this.namesrvAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
+ this.brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
+ this.brokerController2 = startBroker(this.namesrvAddress, this.controllerAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
// Wait slave connecting to master
assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
@@ -77,7 +80,6 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
System.out.println("Begin test");
final MessageStore messageStore = brokerController1.getMessageStore();
putMessage(messageStore);
-
// Check slave message
checkMessage(brokerController2.getMessageStore(), 10, 0);
}
@@ -90,7 +92,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
return true;
} else {
System.out.println("slave not ready");
- Thread.sleep(1000);
+ Thread.sleep(2000);
tryTimes++;
}
}
@@ -123,7 +125,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
// Restart old master, it should be slave
- brokerController1 = startBroker(this.namesrvAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+ brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
waitSlaveReady(brokerController1.getMessageStore());
assertFalse(brokerController1.getReplicasManager().isMasterState());
@@ -142,7 +144,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
init(defaultFileSize);
mockData();
- BrokerController broker3 = startBroker(this.namesrvAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+ BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
waitSlaveReady(broker3.getMessageStore());
Thread.sleep(6000);
@@ -169,7 +171,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
// Step3: add broker3
- BrokerController broker3 = startBroker(this.namesrvAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+ BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
waitSlaveReady(broker3.getMessageStore());
Thread.sleep(6000);
checkMessage(broker3.getMessageStore(), 10, 0);
@@ -197,7 +199,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
checkMessage(broker2MessageStore, 10, 10);
// Step6, start broker4, link to broker2, it should sync msg from epoch2(offset = 1700).
- BrokerController broker4 = startBroker(this.namesrvAddress, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+ BrokerController broker4 = startBroker(this.namesrvAddress, this.controllerAddress, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
waitSlaveReady(broker4.getMessageStore());
Thread.sleep(6000);
checkMessage(broker4.getMessageStore(), 10, 10);