You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/19 02:11:20 UTC

[rocketmq] 01/07: [ISSUE #5069] polish the startup of proxy; can specify parameters on the command line of proxy (#5083)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch release-5.0.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit e11dc27c495740abdc2c561acb3f5b700259a9af
Author: lk <ka...@alibaba-inc.com>
AuthorDate: Fri Sep 16 17:31:27 2022 +0800

    [ISSUE #5069] polish the startup of proxy; can specify parameters on the command line of proxy (#5083)
    
    undefined
---
 .licenserc.yaml                                    |   1 +
 distribution/bin/mqbroker                          |  35 +++-
 distribution/bin/mqshutdown                        |   7 +-
 distribution/conf/rmq-proxy.json                   |   2 +-
 proxy/BUILD.bazel                                  |   2 +
 .../apache/rocketmq/proxy/CommandLineArgument.java |  56 ++++++
 .../org/apache/rocketmq/proxy/ProxyStartup.java    |  85 +++++++-
 .../rocketmq/proxy/config/Configuration.java       |  28 +--
 .../apache/rocketmq/proxy/config/ProxyConfig.java  |  33 ++--
 .../proxy/service/mqclient/MQClientAPIFactory.java |   8 +-
 .../apache/rocketmq/proxy/ProxyStartupTest.java    | 220 +++++++++++++++++++++
 .../service/message/LocalMessageServiceTest.java   |   2 +-
 .../org.mockito.plugins.MockMaker                  |   1 +
 .../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java   |   2 +-
 14 files changed, 438 insertions(+), 44 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 51741f903..0d732fa04 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -35,6 +35,7 @@ header:
     - '*/src/test/resources/META-INF/service/*'
     - '*/src/main/resources/META-INF/service/*'
     - '*/src/test/resources/rmq-proxy-home/conf/rmq-proxy.json'
+    - '*/src/test/resources/mockito-extensions/*'
     - '**/target/**'
     - '**/*.iml'
     - 'docs/**'
diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker
index 6a79c392e..17e39f07c 100644
--- a/distribution/bin/mqbroker
+++ b/distribution/bin/mqbroker
@@ -42,4 +42,37 @@ fi
 
 export ROCKETMQ_HOME
 
-sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@
+other_args=" "
+enable_proxy=false
+
+while [[ $# -gt 0 ]]; do
+  case $1 in
+    --enable-proxy)
+      enable_proxy=true
+      shift
+      ;;
+    -c|--configFile)
+      broker_config="$2"
+      shift
+      shift
+      ;;
+    *)
+      other_args=${other_args}" "${1}
+      shift
+      ;;
+  esac
+done
+
+if [ "$enable_proxy" = true ]; then
+  args_for_proxy=$other_args" -pm local"
+  if [ "$broker_config" != "" ]; then
+      args_for_proxy=${args_for_proxy}" -bc "${broker_config}
+  fi
+  sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
+else
+  args_for_broker=$other_args
+  if [ "$broker_config" != "" ]; then
+      args_for_broker=${args_for_broker}" -c "${broker_config}
+  fi
+  sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup ${args_for_broker}
+fi
\ No newline at end of file
diff --git a/distribution/bin/mqshutdown b/distribution/bin/mqshutdown
index 6006cfeb1..df2da0c31 100644
--- a/distribution/bin/mqshutdown
+++ b/distribution/bin/mqshutdown
@@ -17,7 +17,12 @@
 
 case $1 in
     broker)
-
+    pid=`ps ax | grep -i 'org.apache.rocketmq.proxy.ProxyStartup' | grep '\-pm local' |grep java | grep -v grep | awk '{print $1}'`
+    if [ "$pid" != "" ] ; then
+            echo "The mqbroker with proxy enable is running(${pid})..."
+            kill ${pid}
+            echo "Send shutdown request to mqbroker with proxy enable OK(${pid})"
+    fi
     pid=`ps ax | grep -i 'org.apache.rocketmq.broker.BrokerStartup' |grep java | grep -v grep | awk '{print $1}'`
     if [ -z "$pid" ] ; then
             echo "No mqbroker running."
diff --git a/distribution/conf/rmq-proxy.json b/distribution/conf/rmq-proxy.json
index 077404aaa..8e92bb18e 100644
--- a/distribution/conf/rmq-proxy.json
+++ b/distribution/conf/rmq-proxy.json
@@ -1,3 +1,3 @@
 {
-  
+  "rocketMQClusterName": "DefaultCluster"
 }
\ No newline at end of file
diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
index eba692157..420267ec0 100644
--- a/proxy/BUILD.bazel
+++ b/proxy/BUILD.bazel
@@ -26,6 +26,7 @@ java_library(
         "//common",
         "//client",
         "//broker",
+        "//srvutil",
         "//acl",
         "@maven//:org_apache_rocketmq_rocketmq_proto",     
         "@maven//:org_apache_commons_commons_lang3",
@@ -50,6 +51,7 @@ java_library(
         "@maven//:ch_qos_logback_logback_classic",
         "@maven//:com_google_code_findbugs_jsr305",
         "@maven//:org_checkerframework_checker_qual",
+        "@maven//:commons_cli_commons_cli",
     ],
 )
 
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java b/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java
new file mode 100644
index 000000000..0499f2659
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy;
+
+public class CommandLineArgument {
+    private String namesrvAddr;
+    private String brokerConfigPath;
+    private String proxyConfigPath;
+    private String proxyMode;
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public String getBrokerConfigPath() {
+        return brokerConfigPath;
+    }
+
+    public void setBrokerConfigPath(String brokerConfigPath) {
+        this.brokerConfigPath = brokerConfigPath;
+    }
+
+    public String getProxyConfigPath() {
+        return proxyConfigPath;
+    }
+
+    public void setProxyConfigPath(String proxyConfigPath) {
+        this.proxyConfigPath = proxyConfigPath;
+    }
+
+    public String getProxyMode() {
+        return proxyMode;
+    }
+
+    public void setProxyMode(String proxyMode) {
+        this.proxyMode = proxyMode;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 9be0abe20..f605df0bf 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -20,11 +20,17 @@ package org.apache.rocketmq.proxy;
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
 import ch.qos.logback.core.joran.spi.JoranException;
+import com.google.common.collect.Lists;
 import io.grpc.protobuf.services.ChannelzService;
 import io.grpc.protobuf.services.ProtoReflectionService;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerStartup;
@@ -36,6 +42,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.config.Configuration;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.grpc.GrpcServer;
@@ -43,6 +50,8 @@ import org.apache.rocketmq.proxy.grpc.GrpcServerBuilder;
 import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
 import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
 import org.slf4j.LoggerFactory;
 
 public class ProxyStartup {
@@ -58,9 +67,9 @@ public class ProxyStartup {
 
     public static void main(String[] args) {
         try {
-            ConfigurationManager.initEnv();
-            initLogger();
-            ConfigurationManager.intConfig();
+            // parse argument from command line
+            CommandLineArgument commandLineArgument = parseCommandLineArgument(args);
+            initLogAndConfiguration(commandLineArgument);
 
             // init thread pool monitor for proxy.
             initThreadPoolMonitor();
@@ -100,7 +109,59 @@ public class ProxyStartup {
         log.info(new Date() + " rocketmq-proxy startup successfully");
     }
 
-    private static MessagingProcessor createMessagingProcessor() {
+    protected static void initLogAndConfiguration(CommandLineArgument commandLineArgument) throws Exception {
+        if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) {
+            System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
+        }
+        ConfigurationManager.initEnv();
+        initLogger();
+        ConfigurationManager.intConfig();
+        setConfigFromCommandLineArgument(commandLineArgument);
+    }
+
+    protected static CommandLineArgument parseCommandLineArgument(String[] args) {
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args,
+            buildCommandlineOptions(), new DefaultParser());
+        if (commandLine == null) {
+            throw new RuntimeException("parse command line argument failed");
+        }
+
+        CommandLineArgument commandLineArgument = new CommandLineArgument();
+        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), commandLineArgument);
+        return commandLineArgument;
+    }
+
+    private static Options buildCommandlineOptions() {
+        Options options =  ServerUtil.buildCommandlineOptions(new Options());
+
+        Option opt = new Option("bc", "brokerConfigPath", true, "Broker config file path for local mode");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("pc", "proxyConfigPath", true, "Proxy config file path");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("pm", "proxyMode", true, "Proxy run in local or cluster mode");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    private static void setConfigFromCommandLineArgument(CommandLineArgument commandLineArgument) {
+        if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) {
+            ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr());
+        }
+        if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) {
+            ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath());
+        }
+        if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) {
+            ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode());
+        }
+    }
+
+    protected static MessagingProcessor createMessagingProcessor() {
         String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode();
         MessagingProcessor messagingProcessor;
 
@@ -112,6 +173,12 @@ public class ProxyStartup {
                 @Override
                 public void start() throws Exception {
                     brokerController.start();
+                    String tip = "The broker[" + brokerController.getBrokerConfig().getBrokerName() + ", "
+                        + brokerController.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+                    if (null != brokerController.getBrokerConfig().getNamesrvAddr()) {
+                        tip += " and name server is " + brokerController.getBrokerConfig().getNamesrvAddr();
+                    }
+                    log.info(tip);
                 }
 
                 @Override
@@ -134,8 +201,14 @@ public class ProxyStartup {
         return application;
     }
 
-    private static BrokerController createBrokerController() {
-        String[] brokerStartupArgs = new String[] {"-c", ConfigurationManager.getProxyConfig().getBrokerConfigPath()};
+    protected static BrokerController createBrokerController() {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        List<String> brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath());
+        if (StringUtils.isNotBlank(config.getNamesrvAddr())) {
+            brokerStartupArgList.add("-n");
+            brokerStartupArgList.add(config.getNamesrvAddr());
+        }
+        String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]);
         return BrokerStartup.createBrokerController(brokerStartupArgs);
     }
 
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java
index 59078c712..9c1ff811b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java
@@ -26,6 +26,7 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,37 +34,38 @@ import org.slf4j.LoggerFactory;
 public class Configuration {
     private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     private final AtomicReference<ProxyConfig> proxyConfigReference = new AtomicReference<>();
+    public static final String CONFIG_PATH_PROPERTY = "com.rocketmq.proxy.configPath";
 
     public void init() throws Exception {
-        String proxyConfigData = loadJsonConfig(ProxyConfig.CONFIG_FILE_NAME);
-        if (null == proxyConfigData) {
-            throw new RuntimeException(String.format("load configuration from file: %s error.", ProxyConfig.CONFIG_FILE_NAME));
-        }
+        String proxyConfigData = loadJsonConfig();
 
         ProxyConfig proxyConfig = JSON.parseObject(proxyConfigData, ProxyConfig.class);
         proxyConfig.initData();
         setProxyConfig(proxyConfig);
     }
 
-    public static String loadJsonConfig(String configFileName) throws Exception {
-        final String testResource = "rmq-proxy-home/conf/" + configFileName;
-        try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) {
-            if (null != inputStream) {
-                return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
+    public static String loadJsonConfig() throws Exception {
+        String configFileName = ProxyConfig.DEFAULT_CONFIG_FILE_NAME;
+        String filePath = System.getProperty(CONFIG_PATH_PROPERTY);
+        if (StringUtils.isBlank(filePath)) {
+            final String testResource = "rmq-proxy-home/conf/" + configFileName;
+            try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) {
+                if (null != inputStream) {
+                    return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
+                }
             }
+            filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString();
         }
 
-        String filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString();
-
         File file = new File(filePath);
         if (!file.exists()) {
             log.warn("the config file {} not exist", filePath);
-            return null;
+            throw new RuntimeException(String.format("the config file %s not exist", filePath));
         }
         long fileLength = file.length();
         if (fileLength <= 0) {
             log.warn("the config file {} length is zero", filePath);
-            return null;
+            throw new RuntimeException(String.format("the config file %s length is zero", filePath));
         }
 
         return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 5a605f28b..00a6cc35e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.proxy.ProxyMode;
 import org.slf4j.Logger;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 public class ProxyConfig implements ConfigFile {
     private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    public final static String CONFIG_FILE_NAME = "rmq-proxy.json";
+    public final static String DEFAULT_CONFIG_FILE_NAME = "rmq-proxy.json";
     private static final int PROCESSOR_NUMBER = Runtime.getRuntime().availableProcessors();
 
     private String rocketMQClusterName = "";
@@ -44,9 +45,9 @@ public class ProxyConfig implements ConfigFile {
     private long printJstackInMillis = Duration.ofSeconds(60).toMillis();
     private long printThreadPoolStatusInMillis = Duration.ofSeconds(3).toMillis();
 
-    private String nameSrvAddr = "";
-    private String nameSrvDomain = "";
-    private String nameSrvDomainSubgroup = "";
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+    private String namesrvDomain = "";
+    private String namesrvDomainSubgroup = "";
     /**
      * gRPC
      */
@@ -234,28 +235,28 @@ public class ProxyConfig implements ConfigFile {
         this.printThreadPoolStatusInMillis = printThreadPoolStatusInMillis;
     }
 
-    public String getNameSrvAddr() {
-        return nameSrvAddr;
+    public String getNamesrvAddr() {
+        return namesrvAddr;
     }
 
-    public void setNameSrvAddr(String nameSrvAddr) {
-        this.nameSrvAddr = nameSrvAddr;
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
     }
 
-    public String getNameSrvDomain() {
-        return nameSrvDomain;
+    public String getNamesrvDomain() {
+        return namesrvDomain;
     }
 
-    public void setNameSrvDomain(String nameSrvDomain) {
-        this.nameSrvDomain = nameSrvDomain;
+    public void setNamesrvDomain(String namesrvDomain) {
+        this.namesrvDomain = namesrvDomain;
     }
 
-    public String getNameSrvDomainSubgroup() {
-        return nameSrvDomainSubgroup;
+    public String getNamesrvDomainSubgroup() {
+        return namesrvDomainSubgroup;
     }
 
-    public void setNameSrvDomainSubgroup(String nameSrvDomainSubgroup) {
-        this.nameSrvDomainSubgroup = nameSrvDomainSubgroup;
+    public void setNamesrvDomainSubgroup(String namesrvDomainSubgroup) {
+        this.namesrvDomainSubgroup = namesrvDomainSubgroup;
     }
 
     public String getProxyMode() {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java
index 0b813ae60..9d7db6cf7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java
@@ -54,11 +54,11 @@ public class MQClientAPIFactory implements StartAndShutdown {
     protected void init() {
         System.setProperty(ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false");
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        if (StringUtils.isEmpty(proxyConfig.getNameSrvDomain())) {
-            System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, proxyConfig.getNameSrvAddr());
+        if (StringUtils.isEmpty(proxyConfig.getNamesrvDomain())) {
+            System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, proxyConfig.getNamesrvAddr());
         } else {
-            System.setProperty("rocketmq.namesrv.domain", proxyConfig.getNameSrvDomain());
-            System.setProperty("rocketmq.namesrv.domain.subgroup", proxyConfig.getNameSrvDomainSubgroup());
+            System.setProperty("rocketmq.namesrv.domain", proxyConfig.getNamesrvDomain());
+            System.setProperty("rocketmq.namesrv.domain.subgroup", proxyConfig.getNamesrvDomainSubgroup());
         }
     }
 
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java
new file mode 100644
index 000000000..6adf7f3fb
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy;
+
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.proxy.config.Configuration;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
+import org.assertj.core.util.Strings;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+
+import static org.apache.rocketmq.proxy.config.ConfigurationManager.RMQ_PROXY_HOME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+
+public class ProxyStartupTest {
+
+    public static String mockProxyHome = "/mock/rmq/proxy/home";
+
+    @Before
+    public void before() throws Throwable {
+        URL mockProxyHomeURL = getClass().getClassLoader().getResource("rmq-proxy-home");
+        if (mockProxyHomeURL != null) {
+            mockProxyHome = mockProxyHomeURL.toURI().getPath();
+        }
+
+        if (!Strings.isNullOrEmpty(mockProxyHome)) {
+            System.setProperty(RMQ_PROXY_HOME, mockProxyHome);
+        }
+    }
+
+    @After
+    public void after() {
+        System.clearProperty(RMQ_PROXY_HOME);
+        System.clearProperty(MixAll.NAMESRV_ADDR_PROPERTY);
+        System.clearProperty(Configuration.CONFIG_PATH_PROPERTY);
+        System.clearProperty(ClientLogger.CLIENT_LOG_USESLF4J);
+    }
+
+    @Test
+    public void testParseAndInitCommandLineArgument() throws Exception {
+        Path configFilePath = Files.createTempFile("testParseAndInitCommandLineArgument", ".json");
+        String configData = "{}";
+        Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+
+        String brokerConfigPath = "brokerConfigPath";
+        String proxyConfigPath = configFilePath.toAbsolutePath().toString();
+        String proxyMode = "LOCAL";
+        String namesrvAddr = "namesrvAddr";
+        CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+            "-bc", brokerConfigPath,
+            "-pc", proxyConfigPath,
+            "-pm", proxyMode,
+            "-n", namesrvAddr
+        });
+
+        assertEquals(brokerConfigPath, commandLineArgument.getBrokerConfigPath());
+        assertEquals(proxyConfigPath, commandLineArgument.getProxyConfigPath());
+        assertEquals(proxyMode, commandLineArgument.getProxyMode());
+        assertEquals(namesrvAddr, commandLineArgument.getNamesrvAddr());
+
+        ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        assertEquals(brokerConfigPath, config.getBrokerConfigPath());
+        assertEquals(proxyMode, config.getProxyMode());
+        assertEquals(namesrvAddr, config.getNamesrvAddr());
+    }
+
+    @Test
+    public void testLocalModeWithNameSrvAddrByProperty() throws Exception {
+        String namesrvAddr = "namesrvAddr";
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
+        CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+            "-pm", "local"
+        });
+        ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        assertEquals(namesrvAddr, config.getNamesrvAddr());
+
+        validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+    }
+
+    private void validateBrokerCreateArgsWithNamsrvAddr(ProxyConfig config, String namesrvAddr) {
+        try (MockedStatic<BrokerStartup> brokerStartupMocked = mockStatic(BrokerStartup.class);
+             MockedStatic<DefaultMessagingProcessor> messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) {
+            ArgumentCaptor<Object> args = ArgumentCaptor.forClass(Object.class);
+            brokerStartupMocked.when(() -> BrokerStartup.createBrokerController((String[]) args.capture()))
+                .thenReturn(mock(BrokerController.class));
+            messagingProcessorMocked.when(() -> DefaultMessagingProcessor.createForLocalMode(any(), any()))
+                .thenReturn(mock(DefaultMessagingProcessor.class));
+
+            ProxyStartup.createMessagingProcessor();
+            String[] passArgs = (String[]) args.getValue();
+            assertEquals("-c", passArgs[0]);
+            assertEquals(config.getBrokerConfigPath(), passArgs[1]);
+            assertEquals("-n", passArgs[2]);
+            assertEquals(namesrvAddr, passArgs[3]);
+            assertEquals(4, passArgs.length);
+        }
+    }
+
+    @Test
+    public void testLocalModeWithNameSrvAddrByConfigFile() throws Exception {
+        String namesrvAddr = "namesrvAddr";
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo");
+        Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByConfigFile", ".json");
+        String configData = "{\n" +
+            "  \"namesrvAddr\": \"namesrvAddr\"\n" +
+            "}";
+        Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+
+        CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+            "-pm", "local",
+            "-pc", configFilePath.toAbsolutePath().toString()
+        });
+        ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        assertEquals(namesrvAddr, config.getNamesrvAddr());
+
+        validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+    }
+
+    @Test
+    public void testLocalModeWithNameSrvAddrByCommandLine() throws Exception {
+        String namesrvAddr = "namesrvAddr";
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo");
+        Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByCommandLine", ".json");
+        String configData = "{\n" +
+            "  \"namesrvAddr\": \"foo\"\n" +
+            "}";
+        Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+
+        CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+            "-pm", "local",
+            "-pc", configFilePath.toAbsolutePath().toString(),
+            "-n", namesrvAddr
+        });
+        ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        assertEquals(namesrvAddr, config.getNamesrvAddr());
+
+        validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+    }
+
+    @Test
+    public void testLocalModeWithAllArgs() throws Exception {
+        String namesrvAddr = "namesrvAddr";
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo");
+        Path configFilePath = Files.createTempFile("testLocalMode", ".json");
+        String configData = "{\n" +
+            "  \"namesrvAddr\": \"foo\"\n" +
+            "}";
+        Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+        Path brokerConfigFilePath = Files.createTempFile("testLocalModeBrokerConfig", ".json");
+
+        CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+            "-pm", "local",
+            "-pc", configFilePath.toAbsolutePath().toString(),
+            "-n", namesrvAddr,
+            "-bc", brokerConfigFilePath.toAbsolutePath().toString()
+        });
+        ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        assertEquals(namesrvAddr, config.getNamesrvAddr());
+        assertEquals(brokerConfigFilePath.toAbsolutePath().toString(), config.getBrokerConfigPath());
+
+        validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+    }
+
+    @Test
+    public void testClusterMode() throws Exception {
+        CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+            "-pm", "cluster"
+        });
+        ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+        try (MockedStatic<DefaultMessagingProcessor> messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) {
+            DefaultMessagingProcessor processor = mock(DefaultMessagingProcessor.class);
+            messagingProcessorMocked.when(DefaultMessagingProcessor::createForClusterMode)
+                .thenReturn(processor);
+
+            assertSame(processor, ProxyStartup.createMessagingProcessor());
+        }
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index e3f6edb99..e46464a6d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -114,7 +114,7 @@ public class LocalMessageServiceTest extends InitConfigAndLoggerTest {
     @Before
     public void setUp() throws Throwable {
         super.before();
-        ConfigurationManager.getProxyConfig().setNameSrvAddr("1.1.1.1");
+        ConfigurationManager.getProxyConfig().setNamesrvAddr("1.1.1.1");
         channelManager = new ChannelManager();
         Mockito.when(brokerControllerMock.getSendMessageProcessor()).thenReturn(sendMessageProcessorMock);
         Mockito.when(brokerControllerMock.getPopMessageProcessor()).thenReturn(popMessageProcessorMock);
diff --git a/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 000000000..ca6ee9cea
--- /dev/null
+++ b/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index d88b6b1c0..586149cd1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -152,7 +152,7 @@ public class GrpcBaseIT extends BaseConf {
 
         ConfigurationManager.initEnv();
         ConfigurationManager.intConfig();
-        ConfigurationManager.getProxyConfig().setNameSrvAddr(nsAddr);
+        ConfigurationManager.getProxyConfig().setNamesrvAddr(nsAddr);
         // Set LongPollingReserveTimeInMillis to 500ms to reserve more time for IT
         ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500);
         ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());