You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/14 11:48:18 UTC

[rocketmq] branch develop updated: [ISSUE #5695] Optimize broker startup (#5696)

This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new aa7a44250 [ISSUE #5695] Optimize broker startup (#5696)
aa7a44250 is described below

commit aa7a442505ac012d1bc61b89bf10c41646a15005
Author: lizhimins <70...@qq.com>
AuthorDate: Wed Dec 14 19:48:00 2022 +0800

    [ISSUE #5695] Optimize broker startup (#5696)
    
    Co-authored-by: 斜阳 <te...@alibaba-inc.com>
---
 .../apache/rocketmq/broker/BrokerController.java   |   6 +-
 .../org/apache/rocketmq/broker/BrokerStartup.java  | 272 +++++++++++----------
 .../broker/filtersrv/FilterServerManager.java      |   4 +-
 .../subscription/SubscriptionGroupManager.java     |   9 +-
 .../rocketmq/broker/topic/TopicConfigManager.java  |  11 +-
 5 files changed, 158 insertions(+), 144 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index b584e8769..0a581b0c6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -192,7 +192,6 @@ public class BrokerController {
     protected final PullRequestHoldService pullRequestHoldService;
     protected final MessageArrivingListener messageArrivingListener;
     protected final Broker2Client broker2Client;
-    protected final SubscriptionGroupManager subscriptionGroupManager;
     protected final ConsumerIdsChangeListener consumerIdsChangeListener;
     protected final EndTransactionProcessor endTransactionProcessor;
     private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
@@ -224,6 +223,7 @@ public class BrokerController {
     protected CountDownLatch remotingServerStartLatch;
     protected RemotingServer fastRemotingServer;
     protected TopicConfigManager topicConfigManager;
+    protected SubscriptionGroupManager subscriptionGroupManager;
     protected TopicQueueMappingManager topicQueueMappingManager;
     protected ExecutorService sendMessageExecutor;
     protected ExecutorService pullMessageExecutor;
@@ -1216,6 +1216,10 @@ public class BrokerController {
         return pullRequestHoldService;
     }
 
+    public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
+        this.subscriptionGroupManager = subscriptionGroupManager;
+    }
+
     public SubscriptionGroupManager getSubscriptionGroupManager() {
         return subscriptionGroupManager;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index e30c021b0..dfa2ab517 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -17,14 +17,16 @@
 package org.apache.rocketmq.broker;
 
 import java.io.BufferedInputStream;
-import java.io.FileInputStream;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
@@ -40,9 +42,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 
 public class BrokerStartup {
-    public static Properties properties = null;
-    public static CommandLine commandLine = null;
-    public static String configFile = null;
+
     public static Logger log;
     public static final SystemConfigFileHelper CONFIG_FILE_HELPER = new SystemConfigFileHelper();
 
@@ -52,11 +52,11 @@ public class BrokerStartup {
 
     public static BrokerController start(BrokerController controller) {
         try {
-
             controller.start();
 
-            String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
-                + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+            String tip = String.format("The broker[%s, %s] boot success. serializeType=%s",
+                controller.getBrokerConfig().getBrokerName(), controller.getBrokerAddr(),
+                RemotingCommand.getSerializeTypeConfigInThisServer());
 
             if (null != controller.getBrokerConfig().getNamesrvAddr()) {
                 tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
@@ -79,163 +79,165 @@ public class BrokerStartup {
         }
     }
 
-    public static BrokerController createBrokerController(String[] args) {
+    public static BrokerController buildBrokerController(String[] args) throws Exception {
         System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
 
-        try {
-            //PackageConflictDetect.detectFastjson();
-            Options options = ServerUtil.buildCommandlineOptions(new Options());
-            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
-                new DefaultParser());
-            if (null == commandLine) {
-                System.exit(-1);
-            }
-
-            final BrokerConfig brokerConfig = new BrokerConfig();
-            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
-            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
-            nettyServerConfig.setListenPort(10911);
-            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        final BrokerConfig brokerConfig = new BrokerConfig();
+        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        nettyServerConfig.setListenPort(10911);
 
-            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
-                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
-                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
-            }
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        CommandLine commandLine = ServerUtil.parseCmdLine(
+            "mqbroker", args, buildCommandlineOptions(options), new DefaultParser());
+        if (null == commandLine) {
+            System.exit(-1);
+        }
 
-            if (commandLine.hasOption('c')) {
-                String file = commandLine.getOptionValue('c');
-                if (file != null) {
-                    CONFIG_FILE_HELPER.setFile(file);
-                    configFile = file;
-                    BrokerPathConfigHelper.setBrokerConfigPath(file);
-                    properties = CONFIG_FILE_HELPER.loadConfig();
-                }
+        Properties properties = null;
+        if (commandLine.hasOption('c')) {
+            String file = commandLine.getOptionValue('c');
+            if (file != null) {
+                CONFIG_FILE_HELPER.setFile(file);
+                BrokerPathConfigHelper.setBrokerConfigPath(file);
+                properties = CONFIG_FILE_HELPER.loadConfig();
             }
+        }
 
-            if (properties != null) {
-                properties2SystemEnv(properties);
-                MixAll.properties2Object(properties, brokerConfig);
-                MixAll.properties2Object(properties, nettyServerConfig);
-                MixAll.properties2Object(properties, nettyClientConfig);
-                MixAll.properties2Object(properties, messageStoreConfig);
-            }
+        if (properties != null) {
+            properties2SystemEnv(properties);
+            MixAll.properties2Object(properties, brokerConfig);
+            MixAll.properties2Object(properties, nettyServerConfig);
+            MixAll.properties2Object(properties, nettyClientConfig);
+            MixAll.properties2Object(properties, messageStoreConfig);
+        }
 
-            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+        if (null == brokerConfig.getRocketmqHome()) {
+            System.out.printf("Please set the %s variable in your environment " +
+                "to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
+            System.exit(-2);
+        }
 
-            if (null == brokerConfig.getRocketmqHome()) {
-                System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
-                System.exit(-2);
+        // Validate namesrvAddr
+        String namesrvAddr = brokerConfig.getNamesrvAddr();
+        if (StringUtils.isNotBlank(namesrvAddr)) {
+            try {
+                String[] addrArray = namesrvAddr.split(";");
+                for (String addr : addrArray) {
+                    NetworkUtil.string2SocketAddress(addr);
+                }
+            } catch (Exception e) {
+                System.out.printf("The Name Server Address[%s] illegal, please set it as follows, " +
+                        "\"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr);
+                System.exit(-3);
             }
+        }
 
-            String namesrvAddr = brokerConfig.getNamesrvAddr();
-            if (null != namesrvAddr) {
-                try {
-                    String[] addrArray = namesrvAddr.split(";");
-                    for (String addr : addrArray) {
-                        NetworkUtil.string2SocketAddress(addr);
+        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
+            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
+            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
+        }
+
+        // Set broker role according to ha config
+        if (!brokerConfig.isEnableControllerMode()) {
+            switch (messageStoreConfig.getBrokerRole()) {
+                case ASYNC_MASTER:
+                case SYNC_MASTER:
+                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                    break;
+                case SLAVE:
+                    if (brokerConfig.getBrokerId() <= MixAll.MASTER_ID) {
+                        System.out.printf("Slave's brokerId must be > 0%n");
+                        System.exit(-3);
                     }
-                } catch (Exception e) {
-                    System.out.printf(
-                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
-                        namesrvAddr);
-                    System.exit(-3);
-                }
+                    break;
+                default:
+                    break;
             }
+        }
 
-            if (!brokerConfig.isEnableControllerMode()) {
-                switch (messageStoreConfig.getBrokerRole()) {
-                    case ASYNC_MASTER:
-                    case SYNC_MASTER:
-                        brokerConfig.setBrokerId(MixAll.MASTER_ID);
-                        break;
-                    case SLAVE:
-                        if (brokerConfig.getBrokerId() <= 0) {
-                            System.out.printf("Slave's brokerId must be > 0");
-                            System.exit(-3);
-                        }
-
-                        break;
-                    default:
-                        break;
-                }
-            }
+        if (messageStoreConfig.isEnableDLegerCommitLog()) {
+            brokerConfig.setBrokerId(-1);
+        }
+        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
+        brokerConfig.setInBrokerContainer(false);
 
-            if (messageStoreConfig.isEnableDLegerCommitLog()) {
-                brokerConfig.setBrokerId(-1);
-            }
+        System.setProperty("brokerLogDir", "");
+        if (brokerConfig.isIsolateLogEnable()) {
+            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
+        }
+        if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
+            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
+        }
 
-            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
-            System.setProperty("brokerLogDir", "");
-            if (brokerConfig.isIsolateLogEnable()) {
-                System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
-            }
-            if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
-                System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
-            }
+        if (commandLine.hasOption('p')) {
+            Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+            MixAll.printObjectProperties(console, brokerConfig);
+            MixAll.printObjectProperties(console, nettyServerConfig);
+            MixAll.printObjectProperties(console, nettyClientConfig);
+            MixAll.printObjectProperties(console, messageStoreConfig);
+            System.exit(0);
+        } else if (commandLine.hasOption('m')) {
+            Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+            MixAll.printObjectProperties(console, brokerConfig, true);
+            MixAll.printObjectProperties(console, nettyServerConfig, true);
+            MixAll.printObjectProperties(console, nettyClientConfig, true);
+            MixAll.printObjectProperties(console, messageStoreConfig, true);
+            System.exit(0);
+        }
 
-            if (commandLine.hasOption('p')) {
-                Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
-                MixAll.printObjectProperties(console, brokerConfig);
-                MixAll.printObjectProperties(console, nettyServerConfig);
-                MixAll.printObjectProperties(console, nettyClientConfig);
-                MixAll.printObjectProperties(console, messageStoreConfig);
-                System.exit(0);
-            } else if (commandLine.hasOption('m')) {
-                Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
-                MixAll.printObjectProperties(console, brokerConfig, true);
-                MixAll.printObjectProperties(console, nettyServerConfig, true);
-                MixAll.printObjectProperties(console, nettyClientConfig, true);
-                MixAll.printObjectProperties(console, messageStoreConfig, true);
-                System.exit(0);
-            }
+        log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+        MixAll.printObjectProperties(log, brokerConfig);
+        MixAll.printObjectProperties(log, nettyServerConfig);
+        MixAll.printObjectProperties(log, nettyClientConfig);
+        MixAll.printObjectProperties(log, messageStoreConfig);
 
-            log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-            MixAll.printObjectProperties(log, brokerConfig);
-            MixAll.printObjectProperties(log, nettyServerConfig);
-            MixAll.printObjectProperties(log, nettyClientConfig);
-            MixAll.printObjectProperties(log, messageStoreConfig);
+        final BrokerController controller = new BrokerController(
+            brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
 
-            brokerConfig.setInBrokerContainer(false);
+        // Remember all configs to prevent discard
+        controller.getConfiguration().registerConfig(properties);
+
+        return controller;
+    }
 
-            final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
-            // remember all configs to prevent discard
-            controller.getConfiguration().registerConfig(properties);
+    public static Runnable buildShutdownHook(BrokerController brokerController) {
+        return new Runnable() {
+            private volatile boolean hasShutdown = false;
+            private final AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+            @Override
+            public void run() {
+                synchronized (this) {
+                    log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
+                    if (!this.hasShutdown) {
+                        this.hasShutdown = true;
+                        long beginTime = System.currentTimeMillis();
+                        brokerController.shutdown();
+                        long consumingTimeTotal = System.currentTimeMillis() - beginTime;
+                        log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
+                    }
+                }
+            }
+        };
+    }
 
+    public static BrokerController createBrokerController(String[] args) {
+        try {
+            BrokerController controller = buildBrokerController(args);
             boolean initResult = controller.initialize();
             if (!initResult) {
                 controller.shutdown();
                 System.exit(-3);
             }
-
-            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-                private volatile boolean hasShutdown = false;
-                private AtomicInteger shutdownTimes = new AtomicInteger(0);
-
-                @Override
-                public void run() {
-                    synchronized (this) {
-                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
-                        if (!this.hasShutdown) {
-                            this.hasShutdown = true;
-                            long beginTime = System.currentTimeMillis();
-                            controller.shutdown();
-                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
-                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
-                        }
-                    }
-                }
-            }, "ShutdownHook"));
-
+            Runtime.getRuntime().addShutdownHook(new Thread(buildShutdownHook(controller)));
             return controller;
         } catch (Throwable e) {
             e.printStackTrace();
             System.exit(-1);
         }
-
         return null;
     }
 
@@ -274,7 +276,7 @@ public class BrokerStartup {
         }
 
         public Properties loadConfig() throws Exception {
-            InputStream in = new BufferedInputStream(new FileInputStream(file));
+            InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
             Properties properties = new Properties();
             properties.load(in);
             in.close();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
index f6628a158..57497f904 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -77,8 +77,8 @@ public class FilterServerManager {
 
     private String buildStartCommand() {
         String config = "";
-        if (BrokerStartup.configFile != null) {
-            config = String.format("-c %s", BrokerStartup.configFile);
+        if (BrokerStartup.CONFIG_FILE_HELPER.getFile() != null) {
+            config = String.format("-c %s", BrokerStartup.CONFIG_FILE_HELPER.getFile());
         }
 
         if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 8b3bb3d49..b9e0780cc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -35,10 +35,10 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
 public class SubscriptionGroupManager extends ConfigManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
-    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+    private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
         new ConcurrentHashMap<>(1024);
 
-    private final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable =
+    private ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable =
         new ConcurrentHashMap<>(4);
 
     private final DataVersion dataVersion = new DataVersion();
@@ -273,6 +273,11 @@ public class SubscriptionGroupManager extends ConfigManager {
         return forbiddenTable;
     }
 
+    public void setForbiddenTable(
+        ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable) {
+        this.forbiddenTable = forbiddenTable;
+    }
+
     public DataVersion getDataVersion() {
         return dataVersion;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 87bf6b445..0e5d5370d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -56,10 +56,8 @@ public class TopicConfigManager extends ConfigManager {
     private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
 
     private transient final Lock topicConfigTableLock = new ReentrantLock();
-
-    private final ConcurrentMap<String, TopicConfig> topicConfigTable =
-        new ConcurrentHashMap<>(1024);
-    private final DataVersion dataVersion = new DataVersion();
+    private ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(1024);
+    private DataVersion dataVersion = new DataVersion();
     private transient BrokerController brokerController;
 
     public TopicConfigManager() {
@@ -606,6 +604,11 @@ public class TopicConfigManager extends ConfigManager {
         return dataVersion;
     }
 
+    public void setTopicConfigTable(
+        ConcurrentMap<String, TopicConfig> topicConfigTable) {
+        this.topicConfigTable = topicConfigTable;
+    }
+
     public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
         return topicConfigTable;
     }