You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/18 12:17:03 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Stand alone a new controller module (#4333)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 4e9e4e3ef [Summer of code] Stand alone a new controller module (#4333)
4e9e4e3ef is described below
commit 4e9e4e3efb3e75fd82ef37782d3714115533a880
Author: hzh0425 <64...@qq.com>
AuthorDate: Wed May 18 20:16:43 2022 +0800
[Summer of code] Stand alone a new controller module (#4333)
* feature: add new module controller
* feature: add heartbeat manager
* feature: link requestProcessor and heartbeatmanager
* feature: add controllerManager and startup
* feature: remove namesrv's duplicate controller code
* review code
---
.../org/apache/rocketmq/common/BrokerAddrInfo.java | 78 +++++++++
.../rocketmq/common/namesrv/ControllerConfig.java | 33 ++++
.../controller/BrokerRegisterRequestHeader.java | 12 ++
{namesrv => controller}/pom.xml | 21 +--
.../controller/BrokerHeartbeatManager.java | 65 +++++++
.../controller/BrokerHousekeepingService.java | 51 ++++++
.../apache/rocketmq}/controller/Controller.java | 3 +-
.../rocketmq/controller/ControllerManager.java | 162 +++++++++++++++++
.../rocketmq/controller/ControllerStartup.java | 164 ++++++++++++++++++
.../impl/DefaultBrokerHeartbeatManager.java | 174 +++++++++++++++++++
.../controller/impl/DledgerController.java | 24 +--
.../impl/DledgerControllerStateMachine.java | 8 +-
.../impl}/event/AlterSyncStateSetEvent.java | 2 +-
.../controller/impl}/event/ApplyBrokerIdEvent.java | 2 +-
.../controller/impl}/event/ControllerResult.java | 2 +-
.../controller/impl}/event/ElectMasterEvent.java | 2 +-
.../controller/impl}/event/EventMessage.java | 2 +-
.../controller/impl}/event/EventSerializer.java | 2 +-
.../rocketmq/controller/impl}/event/EventType.java | 2 +-
.../controller/impl}/manager/BrokerInfo.java | 2 +-
.../impl}/manager/InSyncReplicasInfo.java | 2 +-
.../impl}/manager/ReplicasInfoManager.java | 14 +-
.../processor/ControllerRequestProcessor.java | 29 +++-
.../impl/controller/ControllerManagerTest.java | 191 +++++++++++++++++++++
.../impl/DefaultBrokerHeartbeatManagerTest.java | 52 ++++++
.../controller/impl/DledgerControllerTest.java | 16 +-
.../impl}/manager/ReplicasInfoManagerTest.java | 13 +-
controller/src/test/resources/logback-test.xml | 33 ++++
namesrv/pom.xml | 4 +
.../apache/rocketmq/namesrv/NamesrvController.java | 17 +-
.../processor/ControllerRequestProcessor.java | 4 +-
.../namesrv/routeinfo/RouteInfoManager.java | 76 +-------
pom.xml | 6 +
33 files changed, 1117 insertions(+), 151 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java
new file mode 100644
index 000000000..cd122c83a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public class BrokerAddrInfo {
+ private final String clusterName;
+ private final String brokerAddr;
+
+ private int hash;
+
+ public BrokerAddrInfo(String clusterName, String brokerAddr) {
+ this.clusterName = clusterName;
+ this.brokerAddr = brokerAddr;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public boolean isEmpty() {
+ return clusterName.isEmpty() && brokerAddr.isEmpty();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj instanceof BrokerAddrInfo) {
+ BrokerAddrInfo addr = (BrokerAddrInfo) obj;
+ return clusterName.equals(addr.clusterName) && brokerAddr.equals(addr.brokerAddr);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int h = hash;
+ if (h == 0 && clusterName.length() + brokerAddr.length() > 0) {
+ for (int i = 0; i < clusterName.length(); i++) {
+ h = 31 * h + clusterName.charAt(i);
+ }
+ h = 31 * h + '_';
+ for (int i = 0; i < brokerAddr.length(); i++) {
+ h = 31 * h + brokerAddr.charAt(i);
+ }
+ hash = h;
+ }
+ return h;
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" + brokerAddr + "]";
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
index 4d4525aef..1d73c9d02 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
@@ -17,14 +17,23 @@
package org.apache.rocketmq.common.namesrv;
import java.io.File;
+import org.apache.rocketmq.common.MixAll;
public class ControllerConfig {
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ private String configStorePath = System.getProperty("user.home") + File.separator + "controller" + File.separator + "controller.properties";
+
/**
* Is startup the controller in this name-srv
*/
private boolean isStartupController = false;
+ /**
+ * Interval of periodic scanning for non-active broker;
+ */
+ private long scanNotActiveBrokerInterval = 5 * 1000;
+
/**
* Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.
*/
@@ -51,6 +60,22 @@ public class ControllerConfig {
*/
private boolean isProcessReadEvent = false;
+ public String getRocketmqHome() {
+ return rocketmqHome;
+ }
+
+ public void setRocketmqHome(String rocketmqHome) {
+ this.rocketmqHome = rocketmqHome;
+ }
+
+ public String getConfigStorePath() {
+ return configStorePath;
+ }
+
+ public void setConfigStorePath(String configStorePath) {
+ this.configStorePath = configStorePath;
+ }
+
public boolean isStartupController() {
return isStartupController;
}
@@ -59,6 +84,14 @@ public class ControllerConfig {
isStartupController = startupController;
}
+ public long getScanNotActiveBrokerInterval() {
+ return scanNotActiveBrokerInterval;
+ }
+
+ public void setScanNotActiveBrokerInterval(long scanNotActiveBrokerInterval) {
+ this.scanNotActiveBrokerInterval = scanNotActiveBrokerInterval;
+ }
+
public int getControllerThreadPoolNums() {
return controllerThreadPoolNums;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java
index 55f84efc2..a29e01e49 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java
@@ -17,12 +17,16 @@
package org.apache.rocketmq.common.protocol.header.namesrv.controller;
import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class BrokerRegisterRequestHeader implements CommandCustomHeader {
private String clusterName;
private String brokerName;
private String brokerAddress;
+ @CFNullable
+ private Long heartbeatTimeoutMillis;
+
public BrokerRegisterRequestHeader() {
}
@@ -57,6 +61,14 @@ public class BrokerRegisterRequestHeader implements CommandCustomHeader {
this.brokerAddress = brokerAddress;
}
+ public Long getHeartbeatTimeoutMillis() {
+ return heartbeatTimeoutMillis;
+ }
+
+ public void setHeartbeatTimeoutMillis(Long heartbeatTimeoutMillis) {
+ this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+ }
+
@Override public String toString() {
return "RegisterBrokerRequestHeader{" +
"clusterName='" + clusterName + '\'' +
diff --git a/namesrv/pom.xml b/controller/pom.xml
similarity index 82%
copy from namesrv/pom.xml
copy to controller/pom.xml
index 83b11d1c5..920ea0de2 100644
--- a/namesrv/pom.xml
+++ b/controller/pom.xml
@@ -17,15 +17,14 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
<version>5.0.0-BETA-SNAPSHOT</version>
</parent>
-
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
- <artifactId>rocketmq-namesrv</artifactId>
- <name>rocketmq-namesrv ${project.version}</name>
+ <artifactId>rocketmq-controller</artifactId>
+ <name>rocketmq-controller ${project.version}</name>
<dependencies>
<dependency>
@@ -57,21 +56,9 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-core</artifactId>
- <version>1.19</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-generator-annprocess</artifactId>
- <version>1.19</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
new file mode 100644
index 000000000..f86dbf558
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import io.netty.channel.Channel;
+
+public interface BrokerHeartbeatManager {
+
+ /**
+ * Broker new heartbeat.
+ */
+ void onBrokerHeartbeat(final String clusterName, final String brokerAddr);
+
+ /**
+ * Start heartbeat manager.
+ */
+ void start();
+
+ /**
+ * Shutdown heartbeat manager.
+ */
+ void shutdown();
+
+ /**
+ * Add BrokerLifecycleListener.
+ */
+ void addBrokerLifecycleListener(final BrokerLifecycleListener listener);
+
+ /**
+ * Register new broker to heartManager.
+ */
+ void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
+ final Long timeoutMillis, final Channel channel);
+
+ /**
+ * Broker channel close
+ */
+ void onBrokerChannelClose(final Channel channel);
+
+ /**
+ * Check whether broker active
+ */
+ boolean isBrokerActive(final String clusterName, final String brokerAddr);
+
+ interface BrokerLifecycleListener {
+ /**
+ * Trigger when broker inactive.
+ */
+ void onBrokerInactive(final String brokerName, final String brokerAddress, final long brokerId);
+ }
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
new file mode 100644
index 000000000..50c96cfd3
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+
+public class BrokerHousekeepingService implements ChannelEventListener {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+ private final ControllerManager controllerManager;
+
+ public BrokerHousekeepingService(ControllerManager controllerManager) {
+ this.controllerManager = controllerManager;
+ }
+
+ @Override
+ public void onChannelConnect(String remoteAddr, Channel channel) {
+ }
+
+ @Override
+ public void onChannelClose(String remoteAddr, Channel channel) {
+ this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
+ }
+
+ @Override
+ public void onChannelException(String remoteAddr, Channel channel) {
+ this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
+ }
+
+ @Override
+ public void onChannelIdle(String remoteAddr, Channel channel) {
+ this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
similarity index 98%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
rename to controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 44f2cdf2c..ec45a8009 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller;
+
+package org.apache.rocketmq.controller;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
new file mode 100644
index 000000000..7b835c5c0
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
+import org.apache.rocketmq.controller.impl.DledgerController;
+import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ControllerManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+
+ private final ControllerConfig controllerConfig;
+ private final NettyServerConfig nettyServerConfig;
+ private final NettyClientConfig nettyClientConfig;
+ private final BrokerHousekeepingService brokerHousekeepingService;
+ private final Configuration configuration;
+ private Controller controller;
+ private BrokerHeartbeatManager heartbeatManager;
+ private RemotingClient remotingClient;
+ private RemotingServer remotingServer;
+
+ private ExecutorService controllerRequestExecutor;
+ private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
+
+ public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
+ NettyClientConfig nettyClientConfig) {
+ this.controllerConfig = controllerConfig;
+ this.nettyServerConfig = nettyServerConfig;
+ this.nettyClientConfig = nettyClientConfig;
+ this.brokerHousekeepingService = new BrokerHousekeepingService(this);
+ this.configuration = new Configuration(
+ log,
+ this.controllerConfig, this.nettyServerConfig
+ );
+ this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
+ }
+
+ public boolean initialize() {
+ this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
+ this.controllerRequestExecutor = new ThreadPoolExecutor(
+ this.controllerConfig.getControllerThreadPoolNums(),
+ this.controllerConfig.getControllerThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.controllerRequestThreadPoolQueue,
+ new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+ return new FutureTaskExt<T>(runnable, value);
+ }
+ };
+ this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
+ this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
+ this.remotingClient.updateNameServerAddressList(Collections.singletonList(RemotingUtil.getLocalAddress() + ":" + this.nettyServerConfig.getListenPort()));
+
+ this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
+ this.controller = new DledgerController(this.controllerConfig, (cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster, brokerAddr));
+
+ // Register broker inactive listener
+ this.heartbeatManager.addBrokerLifecycleListener(new BrokerHeartbeatManager.BrokerLifecycleListener() {
+ @Override
+ public void onBrokerInactive(String brokerName, String brokerAddress, long brokerId) {
+ if (brokerId == MixAll.MASTER_ID) {
+ final CompletableFuture<RemotingCommand> future = controller.electMaster(new ElectMasterRequestHeader(brokerName));
+ try {
+ final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
+ final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
+ if (responseHeader != null) {
+ log.info("Broker{}'s master shutdown, elect a new master:{}", brokerName, responseHeader);
+ }
+ } catch (Exception ignored) {
+ }
+ }
+ }
+ });
+ this.registerProcessor();
+ return true;
+ }
+
+ public void registerProcessor() {
+ final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
+ this.remotingServer.registerDefaultProcessor(controllerRequestProcessor, this.controllerRequestExecutor);
+ }
+
+ public void start() {
+ this.remotingServer.start();
+ this.remotingClient.start();
+ this.heartbeatManager.start();
+ this.controller.startup();
+ }
+
+ public void shutdown() {
+ this.remotingServer.shutdown();
+ this.heartbeatManager.shutdown();
+ this.controller.shutdown();
+ this.controllerRequestExecutor.shutdown();
+ }
+
+ public BrokerHeartbeatManager getHeartbeatManager() {
+ return heartbeatManager;
+ }
+
+ public ControllerConfig getControllerConfig() {
+ return controllerConfig;
+ }
+
+ public Controller getController() {
+ return controller;
+ }
+
+ public RemotingServer getRemotingServer() {
+ return remotingServer;
+ }
+
+ public NettyServerConfig getNettyServerConfig() {
+ return nettyServerConfig;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
new file mode 100644
index 000000000..3820e64e6
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.srvutil.ShutdownHookThread;
+import org.slf4j.LoggerFactory;
+
+public class ControllerStartup {
+
+ private static InternalLogger log;
+ private static Properties properties = null;
+ private static CommandLine commandLine = null;
+
+ public static void main(String[] args) {
+ main0(args);
+ }
+
+ public static ControllerManager main0(String[] args) {
+
+ try {
+ ControllerManager controller = createControllerManager(args);
+ start(controller);
+ String tip = "The Controller Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+ log.info(tip);
+ System.out.printf("%s%n", tip);
+ return controller;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return null;
+ }
+
+ public static ControllerManager createControllerManager(String[] args) throws IOException, JoranException {
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ commandLine = ServerUtil.parseCmdLine("mqcontroller", args, buildCommandlineOptions(options), new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ return null;
+ }
+
+ final ControllerConfig controllerConfig = new ControllerConfig();
+ final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ nettyServerConfig.setListenPort(19876);
+
+ if (commandLine.hasOption('c')) {
+ String file = commandLine.getOptionValue('c');
+ if (file != null) {
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ properties = new Properties();
+ properties.load(in);
+ MixAll.properties2Object(properties, controllerConfig);
+ MixAll.properties2Object(properties, nettyServerConfig);
+ MixAll.properties2Object(properties, nettyClientConfig);
+
+ System.out.printf("load config properties file OK, %s%n", file);
+ in.close();
+ }
+ }
+
+ if (commandLine.hasOption('p')) {
+ MixAll.printObjectProperties(null, controllerConfig);
+ MixAll.printObjectProperties(null, nettyServerConfig);
+ MixAll.printObjectProperties(null, nettyClientConfig);
+ System.exit(0);
+ }
+
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), controllerConfig);
+
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ configurator.doConfigure(controllerConfig.getRocketmqHome() + "/conf/logback_controller.xml");
+
+ log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+ MixAll.printObjectProperties(log, controllerConfig);
+ MixAll.printObjectProperties(log, nettyServerConfig);
+
+ final ControllerManager controllerManager = new ControllerManager(controllerConfig, nettyServerConfig, nettyClientConfig);
+ // remember all configs to prevent discard
+ controllerManager.getConfiguration().registerConfig(properties);
+
+ return controllerManager;
+ }
+
+ public static ControllerManager start(final ControllerManager controller) throws Exception {
+
+ if (null == controller) {
+ throw new IllegalArgumentException("ControllerManager is null");
+ }
+
+ boolean initResult = controller.initialize();
+ if (!initResult) {
+ controller.shutdown();
+ System.exit(-3);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
+ controller.shutdown();
+ return null;
+ }));
+
+ controller.start();
+
+ return controller;
+ }
+
+ public static void shutdown(final ControllerManager controller) {
+ controller.shutdown();
+ }
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("c", "configFile", true, "Controller config properties file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("p", "printConfigItem", false, "Print all config items");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
new file mode 100644
index 000000000..7c4b08149
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.BrokerAddrInfo;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+ private static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 10;
+ private final ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
+ private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
+
+ private final ControllerConfig controllerConfig;
+ private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
+ private final List<BrokerLifecycleListener> brokerLifecycleListeners;
+
+ public DefaultBrokerHeartbeatManager(final ControllerConfig controllerConfig) {
+ this.controllerConfig = controllerConfig;
+ this.brokerLiveTable = new ConcurrentHashMap<>(256);
+ this.brokerLifecycleListeners = new ArrayList<>();
+ }
+
+ @Override
+ public void start() {
+ this.scheduledService.scheduleAtFixedRate(this::scanNotActiveBroker, 2000, this.controllerConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void shutdown() {
+ this.scheduledService.shutdown();
+ this.executor.shutdown();
+ }
+
+ public void scanNotActiveBroker() {
+ try {
+ log.info("start scanNotActiveBroker");
+ final Iterator<Map.Entry<BrokerAddrInfo, BrokerLiveInfo>> iterator = this.brokerLiveTable.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next = iterator.next();
+ long last = next.getValue().lastUpdateTimestamp;
+ long timeoutMillis = next.getValue().heartbeatTimeoutMillis;
+ if ((last + timeoutMillis) < System.currentTimeMillis()) {
+ final Channel channel = next.getValue().channel;
+ if (channel != null) {
+ RemotingUtil.closeChannel(channel);
+ }
+ iterator.remove();
+ this.executor.submit(() ->
+ notifyBrokerInActive(next.getValue().brokerName, next.getKey().getBrokerAddr(), next.getValue().brokerId));
+ log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
+ }
+ }
+ } catch (Exception e) {
+ log.error("scanNotActiveBroker exception", e);
+ }
+ }
+
+ private void notifyBrokerInActive(String brokerName, String brokerAddr, Long brokerId) {
+ for (BrokerLifecycleListener listener : this.brokerLifecycleListeners) {
+ listener.onBrokerInactive(brokerName, brokerAddr, brokerId);
+ }
+ }
+
+ @Override
+ public void addBrokerLifecycleListener(BrokerLifecycleListener listener) {
+ this.brokerLifecycleListeners.add(listener);
+ }
+
+ @Override
+ public void registerBroker(String clusterName, String brokerName, String brokerAddr,
+ long brokerId, Long timeoutMillis, Channel channel) {
+ final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
+ final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
+ new BrokerLiveInfo(brokerName,
+ brokerId,
+ System.currentTimeMillis(),
+ timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+ channel));
+ if (prevBrokerLiveInfo == null) {
+ log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
+ }
+ }
+
+ @Override
+ public void onBrokerHeartbeat(String clusterName, String brokerAddr) {
+ BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
+ BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
+ if (prev != null) {
+ prev.lastUpdateTimestamp = System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public void onBrokerChannelClose(Channel channel) {
+ synchronized (this) {
+ for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) {
+ if (entry.getValue().channel == channel) {
+ this.executor.submit(() ->
+ notifyBrokerInActive(entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId));
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isBrokerActive(String clusterName, String brokerAddr) {
+ final BrokerLiveInfo info = this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerAddr));
+ if (info != null) {
+ long last = info.lastUpdateTimestamp;
+ long timeoutMillis = info.heartbeatTimeoutMillis;
+ return (last + timeoutMillis) >= System.currentTimeMillis();
+ }
+ return false;
+ }
+
+ static class BrokerLiveInfo {
+ private final String brokerName;
+ private final long brokerId;
+ private final long heartbeatTimeoutMillis;
+ private final Channel channel;
+ private long lastUpdateTimestamp;
+
+ public BrokerLiveInfo(String brokerName, long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
+ Channel channel) {
+ this.brokerName = brokerName;
+ this.brokerId = brokerId;
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+ this.channel = channel;
+ }
+
+ @Override public String toString() {
+ return "BrokerLiveInfo{" +
+ "brokerName='" + brokerName + '\'' +
+ ", brokerId=" + brokerId +
+ ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+ ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+ ", channel=" + channel +
+ '}';
+ }
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
similarity index 95%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
index a7ac9d592..d057e705d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.impl;
+package org.apache.rocketmq.controller.impl;
import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
@@ -45,14 +45,13 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegis
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
+import org.apache.rocketmq.controller.impl.event.EventSerializer;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.namesrv.controller.Controller;
-import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
-import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -65,10 +64,8 @@ public class DledgerController implements Controller {
private final DLedgerServer dLedgerServer;
private final ControllerConfig controllerConfig;
private final DLedgerConfig dLedgerConfig;
- private final NamesrvController namesrvController;
// Usr for checking whether the broker is alive
private final BiPredicate<String, String> brokerAlivePredicate;
-
private final ReplicasInfoManager replicasInfoManager;
private final EventScheduler scheduler;
private final EventSerializer eventSerializer;
@@ -76,16 +73,11 @@ public class DledgerController implements Controller {
private final DledgerControllerStateMachine statemachine;
private volatile boolean isScheduling = false;
- public DledgerController(final ControllerConfig config, final NamesrvController namesrvController) {
+ public DledgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) {
this.controllerConfig = config;
this.eventSerializer = new EventSerializer();
this.scheduler = new EventScheduler();
- this.namesrvController = namesrvController;
- if (namesrvController == null) {
- this.brokerAlivePredicate = (cluster, address) -> true;
- } else {
- this.brokerAlivePredicate = (cluster, address) -> namesrvController.getRouteInfoManager().isBrokerAlive(cluster, address);
- }
+ this.brokerAlivePredicate = brokerAlivePredicate;
this.dLedgerConfig = new DLedgerConfig();
this.dLedgerConfig.setGroup(config.getControllerDLegerGroup());
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerControllerStateMachine.java
similarity index 91%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerControllerStateMachine.java
index 3e21d29dd..54e93c5f2 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerControllerStateMachine.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.impl;
+package org.apache.rocketmq.controller.impl;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
@@ -23,11 +23,11 @@ import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
import io.openmessaging.storage.dledger.statemachine.StateMachine;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
+import org.apache.rocketmq.controller.impl.event.EventSerializer;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
/**
* The state machine implementation of the dledger controller
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
similarity index 96%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
index 4da71e32b..5156b5b52 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
import java.util.HashSet;
import java.util.Set;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
similarity index 96%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
index fc9860dbe..06108b57c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
/**
* The event trys to apply a new id for a new broker.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java
index 5d61f92af..41af2af04 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
import java.util.ArrayList;
import java.util.List;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
index 34aaeca86..f91c869e8 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
/**
* The event trys to elect a new master for target broker.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventMessage.java
similarity index 94%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventMessage.java
index 768fc0dd6..8a31393e1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventMessage.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
/**
* The parent class of Event, the subclass needs to indicate eventType.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
index 38d12a93a..f78f399a5 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
import org.apache.commons.lang3.SerializationException;
import org.apache.rocketmq.common.utils.FastJsonSerializer;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
similarity index 96%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
index 9496d0a3e..f6971cbdf 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
/**
* Event type (name, id);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index ef2efeaba..1b0242e0d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.manager;
import java.util.HashMap;
import java.util.HashSet;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/InSyncReplicasInfo.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/InSyncReplicasInfo.java
index 8b8c5afd4..b0b7b0c3c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/InSyncReplicasInfo.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.manager;
import java.util.HashSet;
import java.util.Set;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index c81b0a122..85ac2734f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.manager;
import java.util.HashMap;
import java.util.HashSet;
@@ -35,14 +35,14 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
+import org.apache.rocketmq.controller.impl.event.EventType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
-import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
/**
* The manager that manages the replicas info for all brokers.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
similarity index 75%
copy from namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
copy to controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index aed048047..bf185c826 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.processor;
+package org.apache.rocketmq.controller.processor;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
@@ -22,18 +22,23 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.Controller;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import static org.apache.rocketmq.common.protocol.RequestCode.BROKER_HEARTBEAT;
import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
@@ -47,9 +52,11 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
private static final int WAIT_TIMEOUT_OUT = 5;
private final Controller controller;
+ private final BrokerHeartbeatManager heartbeatManager;
- public ControllerRequestProcessor(final Controller controller) {
- this.controller = controller;
+ public ControllerRequestProcessor(final ControllerManager controllerManager) {
+ this.controller = controllerManager.getController();
+ this.heartbeatManager = controllerManager.getHeartbeatManager();
}
@Override
@@ -82,7 +89,13 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
final BrokerRegisterRequestHeader controllerRequest = request.decodeCommandCustomHeader(BrokerRegisterRequestHeader.class);
final CompletableFuture<RemotingCommand> future = this.controller.registerBroker(controllerRequest);
if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ final BrokerRegisterResponseHeader responseHeader = (BrokerRegisterResponseHeader) response.readCustomHeader();
+ if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
+ this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
+ responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
+ }
+ return response;
}
break;
}
@@ -97,6 +110,10 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
case CONTROLLER_GET_METADATA_INFO: {
return this.controller.getControllerMetadata();
}
+ case BROKER_HEARTBEAT: {
+ final BrokerHeartbeatRequestHeader requestHeader = request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+ this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerAddr());
+ }
default: {
final String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
new file mode 100644
index 000000000..7df1633a9
--- /dev/null
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.controller;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.ControllerManager;
+import org.apache.rocketmq.controller.impl.DledgerController;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.rocketmq.common.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
+import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import static org.junit.Assert.assertEquals;
+
+public class ControllerManagerTest {
+ private List<String> baseDirs;
+ private List<ControllerManager> controllers;
+ private NettyRemotingClient remotingClient;
+
+ public ControllerManager launchManager(final String group, final String peers, final String selfId, final int listenPort) {
+ final String path = "/tmp" + File.separator + group + File.separator + selfId;
+ baseDirs.add(path);
+
+ final ControllerConfig config = new ControllerConfig();
+ config.setControllerDLegerGroup(group);
+ config.setControllerDLegerPeers(peers);
+ config.setControllerDLegerSelfId(selfId);
+ config.setControllerStorePath(path);
+ config.setMappedFileSize(10 * 1024 * 1024);
+ config.setEnableElectUncleanMaster(true);
+ config.setScanNotActiveBrokerInterval(2000L);
+
+ final NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(listenPort);
+
+ final ControllerManager manager = new ControllerManager(config, serverConfig, new NettyClientConfig());
+ manager.initialize();
+ manager.start();
+ this.controllers.add(manager);
+ return manager;
+ }
+
+ @Before
+ public void startup() {
+ this.baseDirs = new ArrayList<>();
+ this.controllers = new ArrayList<>();
+ this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
+ this.remotingClient.start();
+ }
+
+ public ControllerManager waitLeader(final List<ControllerManager> controllers) throws Exception {
+ if (controllers.isEmpty()) {
+ return null;
+ }
+ DledgerController c1 = (DledgerController) controllers.get(0).getController();
+ while (c1.getMemberState().getLeaderId() == null) {
+ Thread.sleep(1000);
+ }
+ String leaderId = c1.getMemberState().getLeaderId();
+ System.out.println("New leader " + leaderId);
+ for (ControllerManager controllerManager : controllers) {
+ final DledgerController controller = (DledgerController) controllerManager.getController();
+ if (controller.getMemberState().getSelfId().equals(leaderId)) {
+ return controllerManager;
+ }
+ }
+ return null;
+ }
+
+ public void mockData() {
+ String group = UUID.randomUUID().toString();
+ String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
+ launchManager(group, peers, "n0", 31000);
+ launchManager(group, peers, "n1", 31001);
+ launchManager(group, peers, "n2", 31002);
+ }
+
+ /**
+ * Register broker to controller
+ */
+ public BrokerRegisterResponseHeader registerBroker(
+ final String controllerAddress, final String clusterName,
+ final String brokerName, final String address) throws Exception {
+
+ final BrokerRegisterRequestHeader requestHeader = new BrokerRegisterRequestHeader(clusterName, brokerName, address);
+ // Timeout = 3000
+ requestHeader.setHeartbeatTimeoutMillis(4000L);
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
+ final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+ assert response != null;
+ switch (response.getCode()) {
+ case SUCCESS: {
+ return response.decodeCommandCustomHeader(BrokerRegisterResponseHeader.class);
+ }
+ case CONTROLLER_NOT_LEADER: {
+ throw new MQBrokerException(response.getCode(), "Controller leader was changed");
+ }
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ @Test
+ public void testSomeApi() throws Exception {
+ mockData();
+ final ControllerManager leader = waitLeader(this.controllers);
+ String leaderAddr = RemotingUtil.getLocalAddress() + ":" + leader.getNettyServerConfig().getListenPort();
+
+ // Register two broker, the first one is master.
+ final BrokerRegisterResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000");
+ assert responseHeader1 != null;
+ assertEquals(responseHeader1.getBrokerId(), MixAll.MASTER_ID);
+
+ final BrokerRegisterResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001");
+ assert responseHeader2 != null;
+ assertEquals(responseHeader2.getBrokerId(), 2);
+
+ // Send heartbeat for broker2
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(()-> {
+ final BrokerHeartbeatRequestHeader heartbeatRequestHeader = new BrokerHeartbeatRequestHeader();
+ heartbeatRequestHeader.setClusterName("cluster1");
+ heartbeatRequestHeader.setBrokerName("broker1");
+ heartbeatRequestHeader.setBrokerAddr("127.0.0.1:8001");
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, heartbeatRequestHeader);
+ try {
+ this.remotingClient.invokeOneway(leaderAddr, request, 3000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }, 0, 2000L, TimeUnit.MILLISECONDS);
+
+
+ // Wait until the master is expired.
+ Thread.sleep(6000);
+
+ // The new master should be broker2.
+ final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader("broker1");
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
+ final RemotingCommand response = this.remotingClient.invokeSync(leaderAddr, request, 3000);
+ final GetReplicaInfoResponseHeader responseHeader = response.decodeCommandCustomHeader(GetReplicaInfoResponseHeader.class);
+ assertEquals(responseHeader.getMasterAddress(), "127.0.0.1:8001");
+
+ executor.shutdown();
+ }
+
+ @After
+ public void tearDown() {
+ for (ControllerManager controller : this.controllers) {
+ controller.shutdown();
+ }
+ for (String dir : this.baseDirs) {
+ System.out.println("Delete file " + dir);
+ new File(dir).delete();
+ }
+ }
+}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
new file mode 100644
index 000000000..d4de7abe4
--- /dev/null
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.controller.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class DefaultBrokerHeartbeatManagerTest {
+ private BrokerHeartbeatManager heartbeatManager;
+
+ @Before
+ public void init() {
+ final ControllerConfig config = new ControllerConfig();
+ config.setScanNotActiveBrokerInterval(2000);
+ this.heartbeatManager = new DefaultBrokerHeartbeatManager(config);
+ this.heartbeatManager.start();
+ }
+
+ @Test
+ public void testDetectBrokerAlive() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ this.heartbeatManager.addBrokerLifecycleListener((brokerName, brokerAddress, brokerId) -> {
+ System.out.println("Broker shutdown:" + brokerAddress);
+ latch.countDown();
+ });
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null);
+ assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+ this.heartbeatManager.shutdown();
+ }
+
+}
\ No newline at end of file
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DledgerControllerTest.java
similarity index 98%
rename from namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DledgerControllerTest.java
index 77384aa3e..3bc9e360f 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DledgerControllerTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.impl;
+package org.apache.rocketmq.controller.impl.controller.impl;
import io.openmessaging.storage.dledger.DLedgerConfig;
import java.io.File;
@@ -28,13 +28,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.namesrv.ControllerConfig;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
-import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.impl.DledgerController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.After;
@@ -48,11 +49,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-/**
- * @author hzh
- * @email 642256541@qq.com
- * @date 2022/4/20 11:05
- */
public class DledgerControllerTest {
private List<String> baseDirs;
private List<DledgerController> controllers;
@@ -69,7 +65,7 @@ public class DledgerControllerTest {
config.setMappedFileSize(10 * 1024 * 1024);
config.setEnableElectUncleanMaster(isEnableElectUncleanMaster);
- final DledgerController controller = new DledgerController(config, null);
+ final DledgerController controller = new DledgerController(config, (str1, str2) -> true);
controller.startup();
return controller;
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
similarity index 95%
rename from namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 656ffe471..fc7d420f5 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.controller.impl.manager;
import java.util.HashSet;
import java.util.List;
@@ -23,15 +23,16 @@ import org.apache.rocketmq.common.namesrv.ControllerConfig;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
-import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
-import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Before;
import org.junit.Test;
diff --git a/controller/src/test/resources/logback-test.xml b/controller/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..eb12a9a05
--- /dev/null
+++ b/controller/src/test/resources/logback-test.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+
+ <appender name="DefaultAppender" class="ch.qos.logback.core.ConsoleAppender">
+ <append>true</append>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <root>
+ <level value="OFF"/>
+ <appender-ref ref="DefaultAppender"/>
+ </root>
+</configuration>
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 83b11d1c5..1ce091429 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -33,6 +33,10 @@
<artifactId>dledger</artifactId>
<version>0.2.5</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-controller</artifactId>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index e10878e6b..0b5e39d64 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -33,10 +33,10 @@ import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.namesrv.ControllerConfig;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.impl.DledgerController;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.Controller;
-import org.apache.rocketmq.namesrv.controller.impl.DledgerController;
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
import org.apache.rocketmq.namesrv.processor.ClientRequestProcessor;
import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
@@ -109,10 +109,11 @@ public class NamesrvController {
this.controllerConfig = controllerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
+ this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
if (controllerConfig.isStartupController()) {
- this.controller = new DledgerController(controllerConfig, this);
+ this.controller = new DledgerController(controllerConfig, this.routeInfoManager::isBrokerAlive);
+ this.routeInfoManager.setController(this.controller);
}
- this.routeInfoManager = new RouteInfoManager(namesrvConfig, this, this.controller);
this.configuration = new Configuration(
LOGGER,
this.namesrvConfig, this.nettyServerConfig
@@ -168,7 +169,6 @@ public class NamesrvController {
return new FutureTaskExt<T>(runnable, value);
}
};
- this.controller.startup();
}
this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
@@ -289,6 +289,10 @@ public class NamesrvController {
}
this.routeInfoManager.start();
+
+ if (this.controllerConfig.isStartupController()) {
+ this.controller.startup();
+ }
}
public void shutdown() {
@@ -296,6 +300,9 @@ public class NamesrvController {
this.remotingServer.shutdown();
this.defaultExecutor.shutdown();
this.clientRequestExecutor.shutdown();
+ if (this.controllerRequestExecutor != null) {
+ this.controllerRequestExecutor.shutdown();
+ }
this.scheduledExecutorService.shutdown();
this.scanExecutorService.shutdown();
this.routeInfoManager.shutdown();
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
index aed048047..01e75803d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
@@ -23,12 +23,12 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.Controller;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 516a94564..cb9c113ed 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.rocketmq.common.BrokerAddrInfo;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -54,10 +55,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.namesrv.controller.Controller;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -82,11 +83,6 @@ public class RouteInfoManager {
private final NamesrvConfig namesrvConfig;
private Controller controller;
- public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController, final Controller controller) {
- this(namesrvConfig, namesrvController);
- this.controller = controller;
- }
-
public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController) {
this.topicQueueTable = new ConcurrentHashMap<String, Map<String, QueueData>>(1024);
this.brokerAddrTable = new ConcurrentHashMap<String, BrokerData>(128);
@@ -607,8 +603,6 @@ public class RouteInfoManager {
if (this.namesrvController != null && this.namesrvController.getControllerConfig().isStartupController() && this.controller != null) {
if (unRegisterRequest.getBrokerId() == 0) {
this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
- // Todo: Inform the master
- // However, because now the broker does not have the related api, so I will complete the process in the future.
}
}
}
@@ -1134,69 +1128,13 @@ public class RouteInfoManager {
}
return false;
}
-}
-
-/**
- * broker address information
- */
-class BrokerAddrInfo {
- private String clusterName;
- private String brokerAddr;
-
- private int hash;
-
- public BrokerAddrInfo(String clusterName, String brokerAddr) {
- this.clusterName = clusterName;
- this.brokerAddr = brokerAddr;
- }
-
- public String getClusterName() {
- return clusterName;
- }
-
- public String getBrokerAddr() {
- return brokerAddr;
- }
-
- public boolean isEmpty() {
- return clusterName.isEmpty() && brokerAddr.isEmpty();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
-
- if (obj instanceof BrokerAddrInfo) {
- BrokerAddrInfo addr = (BrokerAddrInfo) obj;
- return clusterName.equals(addr.clusterName) && brokerAddr.equals(addr.brokerAddr);
- }
- return false;
- }
- @Override
- public int hashCode() {
- int h = hash;
- if (h == 0 && clusterName.length() + brokerAddr.length() > 0) {
- for (int i = 0; i < clusterName.length(); i++) {
- h = 31 * h + clusterName.charAt(i);
- }
- h = 31 * h + '_';
- for (int i = 0; i < brokerAddr.length(); i++) {
- h = 31 * h + brokerAddr.charAt(i);
- }
- hash = h;
- }
- return h;
+ public void setController(Controller controller) {
+ this.controller = controller;
}
- @Override
- public String toString() {
- return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" + brokerAddr + "]";
+ public Controller getController() {
+ return controller;
}
}
@@ -1257,6 +1195,8 @@ class BrokerLiveInfo {
this.haServerAddr = haServerAddr;
}
+
+
@Override
public String toString() {
return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp + ", dataVersion=" + dataVersion
diff --git a/pom.xml b/pom.xml
index 08e97dd68..8bfef0986 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
<module>acl</module>
<module>example</module>
<module>container</module>
+ <module>controller</module>
</modules>
<build>
@@ -453,6 +454,11 @@
<dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-controller</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>