You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/14 11:48:18 UTC
[rocketmq] branch develop updated: [ISSUE #5695] Optimize broker startup (#5696)
This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new aa7a44250 [ISSUE #5695] Optimize broker startup (#5696)
aa7a44250 is described below
commit aa7a442505ac012d1bc61b89bf10c41646a15005
Author: lizhimins <70...@qq.com>
AuthorDate: Wed Dec 14 19:48:00 2022 +0800
[ISSUE #5695] Optimize broker startup (#5696)
Co-authored-by: 斜阳 <te...@alibaba-inc.com>
---
.../apache/rocketmq/broker/BrokerController.java | 6 +-
.../org/apache/rocketmq/broker/BrokerStartup.java | 272 +++++++++++----------
.../broker/filtersrv/FilterServerManager.java | 4 +-
.../subscription/SubscriptionGroupManager.java | 9 +-
.../rocketmq/broker/topic/TopicConfigManager.java | 11 +-
5 files changed, 158 insertions(+), 144 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index b584e8769..0a581b0c6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -192,7 +192,6 @@ public class BrokerController {
protected final PullRequestHoldService pullRequestHoldService;
protected final MessageArrivingListener messageArrivingListener;
protected final Broker2Client broker2Client;
- protected final SubscriptionGroupManager subscriptionGroupManager;
protected final ConsumerIdsChangeListener consumerIdsChangeListener;
protected final EndTransactionProcessor endTransactionProcessor;
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
@@ -224,6 +223,7 @@ public class BrokerController {
protected CountDownLatch remotingServerStartLatch;
protected RemotingServer fastRemotingServer;
protected TopicConfigManager topicConfigManager;
+ protected SubscriptionGroupManager subscriptionGroupManager;
protected TopicQueueMappingManager topicQueueMappingManager;
protected ExecutorService sendMessageExecutor;
protected ExecutorService pullMessageExecutor;
@@ -1216,6 +1216,10 @@ public class BrokerController {
return pullRequestHoldService;
}
+ public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
+ this.subscriptionGroupManager = subscriptionGroupManager;
+ }
+
public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index e30c021b0..dfa2ab517 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -17,14 +17,16 @@
package org.apache.rocketmq.broker;
import java.io.BufferedInputStream;
-import java.io.FileInputStream;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
@@ -40,9 +42,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
public class BrokerStartup {
- public static Properties properties = null;
- public static CommandLine commandLine = null;
- public static String configFile = null;
+
public static Logger log;
public static final SystemConfigFileHelper CONFIG_FILE_HELPER = new SystemConfigFileHelper();
@@ -52,11 +52,11 @@ public class BrokerStartup {
public static BrokerController start(BrokerController controller) {
try {
-
controller.start();
- String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
- + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+ String tip = String.format("The broker[%s, %s] boot success. serializeType=%s",
+ controller.getBrokerConfig().getBrokerName(), controller.getBrokerAddr(),
+ RemotingCommand.getSerializeTypeConfigInThisServer());
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
@@ -79,163 +79,165 @@ public class BrokerStartup {
}
}
- public static BrokerController createBrokerController(String[] args) {
+ public static BrokerController buildBrokerController(String[] args) throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
- try {
- //PackageConflictDetect.detectFastjson();
- Options options = ServerUtil.buildCommandlineOptions(new Options());
- commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
- new DefaultParser());
- if (null == commandLine) {
- System.exit(-1);
- }
-
- final BrokerConfig brokerConfig = new BrokerConfig();
- final NettyServerConfig nettyServerConfig = new NettyServerConfig();
- final NettyClientConfig nettyClientConfig = new NettyClientConfig();
- nettyServerConfig.setListenPort(10911);
- final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ final BrokerConfig brokerConfig = new BrokerConfig();
+ final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ nettyServerConfig.setListenPort(10911);
- if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
- int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
- messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
- }
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ CommandLine commandLine = ServerUtil.parseCmdLine(
+ "mqbroker", args, buildCommandlineOptions(options), new DefaultParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ }
- if (commandLine.hasOption('c')) {
- String file = commandLine.getOptionValue('c');
- if (file != null) {
- CONFIG_FILE_HELPER.setFile(file);
- configFile = file;
- BrokerPathConfigHelper.setBrokerConfigPath(file);
- properties = CONFIG_FILE_HELPER.loadConfig();
- }
+ Properties properties = null;
+ if (commandLine.hasOption('c')) {
+ String file = commandLine.getOptionValue('c');
+ if (file != null) {
+ CONFIG_FILE_HELPER.setFile(file);
+ BrokerPathConfigHelper.setBrokerConfigPath(file);
+ properties = CONFIG_FILE_HELPER.loadConfig();
}
+ }
- if (properties != null) {
- properties2SystemEnv(properties);
- MixAll.properties2Object(properties, brokerConfig);
- MixAll.properties2Object(properties, nettyServerConfig);
- MixAll.properties2Object(properties, nettyClientConfig);
- MixAll.properties2Object(properties, messageStoreConfig);
- }
+ if (properties != null) {
+ properties2SystemEnv(properties);
+ MixAll.properties2Object(properties, brokerConfig);
+ MixAll.properties2Object(properties, nettyServerConfig);
+ MixAll.properties2Object(properties, nettyClientConfig);
+ MixAll.properties2Object(properties, messageStoreConfig);
+ }
- MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+ if (null == brokerConfig.getRocketmqHome()) {
+ System.out.printf("Please set the %s variable in your environment " +
+ "to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
+ System.exit(-2);
+ }
- if (null == brokerConfig.getRocketmqHome()) {
- System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
- System.exit(-2);
+ // Validate namesrvAddr
+ String namesrvAddr = brokerConfig.getNamesrvAddr();
+ if (StringUtils.isNotBlank(namesrvAddr)) {
+ try {
+ String[] addrArray = namesrvAddr.split(";");
+ for (String addr : addrArray) {
+ NetworkUtil.string2SocketAddress(addr);
+ }
+ } catch (Exception e) {
+ System.out.printf("The Name Server Address[%s] illegal, please set it as follows, " +
+ "\"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr);
+ System.exit(-3);
}
+ }
- String namesrvAddr = brokerConfig.getNamesrvAddr();
- if (null != namesrvAddr) {
- try {
- String[] addrArray = namesrvAddr.split(";");
- for (String addr : addrArray) {
- NetworkUtil.string2SocketAddress(addr);
+ if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
+ int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
+ messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
+ }
+
+ // Set broker role according to ha config
+ if (!brokerConfig.isEnableControllerMode()) {
+ switch (messageStoreConfig.getBrokerRole()) {
+ case ASYNC_MASTER:
+ case SYNC_MASTER:
+ brokerConfig.setBrokerId(MixAll.MASTER_ID);
+ break;
+ case SLAVE:
+ if (brokerConfig.getBrokerId() <= MixAll.MASTER_ID) {
+ System.out.printf("Slave's brokerId must be > 0%n");
+ System.exit(-3);
}
- } catch (Exception e) {
- System.out.printf(
- "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
- namesrvAddr);
- System.exit(-3);
- }
+ break;
+ default:
+ break;
}
+ }
- if (!brokerConfig.isEnableControllerMode()) {
- switch (messageStoreConfig.getBrokerRole()) {
- case ASYNC_MASTER:
- case SYNC_MASTER:
- brokerConfig.setBrokerId(MixAll.MASTER_ID);
- break;
- case SLAVE:
- if (brokerConfig.getBrokerId() <= 0) {
- System.out.printf("Slave's brokerId must be > 0");
- System.exit(-3);
- }
-
- break;
- default:
- break;
- }
- }
+ if (messageStoreConfig.isEnableDLegerCommitLog()) {
+ brokerConfig.setBrokerId(-1);
+ }
+ messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
+ brokerConfig.setInBrokerContainer(false);
- if (messageStoreConfig.isEnableDLegerCommitLog()) {
- brokerConfig.setBrokerId(-1);
- }
+ System.setProperty("brokerLogDir", "");
+ if (brokerConfig.isIsolateLogEnable()) {
+ System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
+ }
+ if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
+ System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
+ }
- messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
- System.setProperty("brokerLogDir", "");
- if (brokerConfig.isIsolateLogEnable()) {
- System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
- }
- if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
- System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
- }
+ if (commandLine.hasOption('p')) {
+ Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+ MixAll.printObjectProperties(console, brokerConfig);
+ MixAll.printObjectProperties(console, nettyServerConfig);
+ MixAll.printObjectProperties(console, nettyClientConfig);
+ MixAll.printObjectProperties(console, messageStoreConfig);
+ System.exit(0);
+ } else if (commandLine.hasOption('m')) {
+ Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+ MixAll.printObjectProperties(console, brokerConfig, true);
+ MixAll.printObjectProperties(console, nettyServerConfig, true);
+ MixAll.printObjectProperties(console, nettyClientConfig, true);
+ MixAll.printObjectProperties(console, messageStoreConfig, true);
+ System.exit(0);
+ }
- if (commandLine.hasOption('p')) {
- Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
- MixAll.printObjectProperties(console, brokerConfig);
- MixAll.printObjectProperties(console, nettyServerConfig);
- MixAll.printObjectProperties(console, nettyClientConfig);
- MixAll.printObjectProperties(console, messageStoreConfig);
- System.exit(0);
- } else if (commandLine.hasOption('m')) {
- Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
- MixAll.printObjectProperties(console, brokerConfig, true);
- MixAll.printObjectProperties(console, nettyServerConfig, true);
- MixAll.printObjectProperties(console, nettyClientConfig, true);
- MixAll.printObjectProperties(console, messageStoreConfig, true);
- System.exit(0);
- }
+ log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ MixAll.printObjectProperties(log, brokerConfig);
+ MixAll.printObjectProperties(log, nettyServerConfig);
+ MixAll.printObjectProperties(log, nettyClientConfig);
+ MixAll.printObjectProperties(log, messageStoreConfig);
- log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- MixAll.printObjectProperties(log, brokerConfig);
- MixAll.printObjectProperties(log, nettyServerConfig);
- MixAll.printObjectProperties(log, nettyClientConfig);
- MixAll.printObjectProperties(log, messageStoreConfig);
+ final BrokerController controller = new BrokerController(
+ brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
- brokerConfig.setInBrokerContainer(false);
+ // Remember all configs to prevent discard
+ controller.getConfiguration().registerConfig(properties);
+
+ return controller;
+ }
- final BrokerController controller = new BrokerController(
- brokerConfig,
- nettyServerConfig,
- nettyClientConfig,
- messageStoreConfig);
- // remember all configs to prevent discard
- controller.getConfiguration().registerConfig(properties);
+ public static Runnable buildShutdownHook(BrokerController brokerController) {
+ return new Runnable() {
+ private volatile boolean hasShutdown = false;
+ private final AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
+ if (!this.hasShutdown) {
+ this.hasShutdown = true;
+ long beginTime = System.currentTimeMillis();
+ brokerController.shutdown();
+ long consumingTimeTotal = System.currentTimeMillis() - beginTime;
+ log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
+ }
+ }
+ }
+ };
+ }
+ public static BrokerController createBrokerController(String[] args) {
+ try {
+ BrokerController controller = buildBrokerController(args);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
-
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- private volatile boolean hasShutdown = false;
- private AtomicInteger shutdownTimes = new AtomicInteger(0);
-
- @Override
- public void run() {
- synchronized (this) {
- log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
- if (!this.hasShutdown) {
- this.hasShutdown = true;
- long beginTime = System.currentTimeMillis();
- controller.shutdown();
- long consumingTimeTotal = System.currentTimeMillis() - beginTime;
- log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
- }
- }
- }
- }, "ShutdownHook"));
-
+ Runtime.getRuntime().addShutdownHook(new Thread(buildShutdownHook(controller)));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
-
return null;
}
@@ -274,7 +276,7 @@ public class BrokerStartup {
}
public Properties loadConfig() throws Exception {
- InputStream in = new BufferedInputStream(new FileInputStream(file));
+ InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
Properties properties = new Properties();
properties.load(in);
in.close();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
index f6628a158..57497f904 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -77,8 +77,8 @@ public class FilterServerManager {
private String buildStartCommand() {
String config = "";
- if (BrokerStartup.configFile != null) {
- config = String.format("-c %s", BrokerStartup.configFile);
+ if (BrokerStartup.CONFIG_FILE_HELPER.getFile() != null) {
+ config = String.format("-c %s", BrokerStartup.CONFIG_FILE_HELPER.getFile());
}
if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 8b3bb3d49..b9e0780cc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -35,10 +35,10 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
public class SubscriptionGroupManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+ private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<>(1024);
- private final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable =
+ private ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable =
new ConcurrentHashMap<>(4);
private final DataVersion dataVersion = new DataVersion();
@@ -273,6 +273,11 @@ public class SubscriptionGroupManager extends ConfigManager {
return forbiddenTable;
}
+ public void setForbiddenTable(
+ ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable) {
+ this.forbiddenTable = forbiddenTable;
+ }
+
public DataVersion getDataVersion() {
return dataVersion;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 87bf6b445..0e5d5370d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -56,10 +56,8 @@ public class TopicConfigManager extends ConfigManager {
private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
private transient final Lock topicConfigTableLock = new ReentrantLock();
-
- private final ConcurrentMap<String, TopicConfig> topicConfigTable =
- new ConcurrentHashMap<>(1024);
- private final DataVersion dataVersion = new DataVersion();
+ private ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(1024);
+ private DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
public TopicConfigManager() {
@@ -606,6 +604,11 @@ public class TopicConfigManager extends ConfigManager {
return dataVersion;
}
+ public void setTopicConfigTable(
+ ConcurrentMap<String, TopicConfig> topicConfigTable) {
+ this.topicConfigTable = topicConfigTable;
+ }
+
public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
return topicConfigTable;
}