You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/19 02:11:20 UTC
[rocketmq] 01/07: [ISSUE #5069] polish the startup of proxy; can specify parameters on the command line of proxy (#5083)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch release-5.0.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit e11dc27c495740abdc2c561acb3f5b700259a9af
Author: lk <ka...@alibaba-inc.com>
AuthorDate: Fri Sep 16 17:31:27 2022 +0800
[ISSUE #5069] polish the startup of proxy; can specify parameters on the command line of proxy (#5083)
undefined
---
.licenserc.yaml | 1 +
distribution/bin/mqbroker | 35 +++-
distribution/bin/mqshutdown | 7 +-
distribution/conf/rmq-proxy.json | 2 +-
proxy/BUILD.bazel | 2 +
.../apache/rocketmq/proxy/CommandLineArgument.java | 56 ++++++
.../org/apache/rocketmq/proxy/ProxyStartup.java | 85 +++++++-
.../rocketmq/proxy/config/Configuration.java | 28 +--
.../apache/rocketmq/proxy/config/ProxyConfig.java | 33 ++--
.../proxy/service/mqclient/MQClientAPIFactory.java | 8 +-
.../apache/rocketmq/proxy/ProxyStartupTest.java | 220 +++++++++++++++++++++
.../service/message/LocalMessageServiceTest.java | 2 +-
.../org.mockito.plugins.MockMaker | 1 +
.../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 2 +-
14 files changed, 438 insertions(+), 44 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 51741f903..0d732fa04 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -35,6 +35,7 @@ header:
- '*/src/test/resources/META-INF/service/*'
- '*/src/main/resources/META-INF/service/*'
- '*/src/test/resources/rmq-proxy-home/conf/rmq-proxy.json'
+ - '*/src/test/resources/mockito-extensions/*'
- '**/target/**'
- '**/*.iml'
- 'docs/**'
diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker
index 6a79c392e..17e39f07c 100644
--- a/distribution/bin/mqbroker
+++ b/distribution/bin/mqbroker
@@ -42,4 +42,37 @@ fi
export ROCKETMQ_HOME
-sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@
+other_args=" "
+enable_proxy=false
+
+while [[ $# -gt 0 ]]; do
+ case $1 in
+ --enable-proxy)
+ enable_proxy=true
+ shift
+ ;;
+ -c|--configFile)
+ broker_config="$2"
+ shift
+ shift
+ ;;
+ *)
+ other_args=${other_args}" "${1}
+ shift
+ ;;
+ esac
+done
+
+if [ "$enable_proxy" = true ]; then
+ args_for_proxy=$other_args" -pm local"
+ if [ "$broker_config" != "" ]; then
+ args_for_proxy=${args_for_proxy}" -bc "${broker_config}
+ fi
+ sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
+else
+ args_for_broker=$other_args
+ if [ "$broker_config" != "" ]; then
+ args_for_broker=${args_for_broker}" -c "${broker_config}
+ fi
+ sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup ${args_for_broker}
+fi
\ No newline at end of file
diff --git a/distribution/bin/mqshutdown b/distribution/bin/mqshutdown
index 6006cfeb1..df2da0c31 100644
--- a/distribution/bin/mqshutdown
+++ b/distribution/bin/mqshutdown
@@ -17,7 +17,12 @@
case $1 in
broker)
-
+ pid=`ps ax | grep -i 'org.apache.rocketmq.proxy.ProxyStartup' | grep '\-pm local' |grep java | grep -v grep | awk '{print $1}'`
+ if [ "$pid" != "" ] ; then
+ echo "The mqbroker with proxy enable is running(${pid})..."
+ kill ${pid}
+ echo "Send shutdown request to mqbroker with proxy enable OK(${pid})"
+ fi
pid=`ps ax | grep -i 'org.apache.rocketmq.broker.BrokerStartup' |grep java | grep -v grep | awk '{print $1}'`
if [ -z "$pid" ] ; then
echo "No mqbroker running."
diff --git a/distribution/conf/rmq-proxy.json b/distribution/conf/rmq-proxy.json
index 077404aaa..8e92bb18e 100644
--- a/distribution/conf/rmq-proxy.json
+++ b/distribution/conf/rmq-proxy.json
@@ -1,3 +1,3 @@
{
-
+ "rocketMQClusterName": "DefaultCluster"
}
\ No newline at end of file
diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
index eba692157..420267ec0 100644
--- a/proxy/BUILD.bazel
+++ b/proxy/BUILD.bazel
@@ -26,6 +26,7 @@ java_library(
"//common",
"//client",
"//broker",
+ "//srvutil",
"//acl",
"@maven//:org_apache_rocketmq_rocketmq_proto",
"@maven//:org_apache_commons_commons_lang3",
@@ -50,6 +51,7 @@ java_library(
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_google_code_findbugs_jsr305",
"@maven//:org_checkerframework_checker_qual",
+ "@maven//:commons_cli_commons_cli",
],
)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java b/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java
new file mode 100644
index 000000000..0499f2659
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/CommandLineArgument.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy;
+
+public class CommandLineArgument {
+ private String namesrvAddr;
+ private String brokerConfigPath;
+ private String proxyConfigPath;
+ private String proxyMode;
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getBrokerConfigPath() {
+ return brokerConfigPath;
+ }
+
+ public void setBrokerConfigPath(String brokerConfigPath) {
+ this.brokerConfigPath = brokerConfigPath;
+ }
+
+ public String getProxyConfigPath() {
+ return proxyConfigPath;
+ }
+
+ public void setProxyConfigPath(String proxyConfigPath) {
+ this.proxyConfigPath = proxyConfigPath;
+ }
+
+ public String getProxyMode() {
+ return proxyMode;
+ }
+
+ public void setProxyMode(String proxyMode) {
+ this.proxyMode = proxyMode;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 9be0abe20..f605df0bf 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -20,11 +20,17 @@ package org.apache.rocketmq.proxy;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
+import com.google.common.collect.Lists;
import io.grpc.protobuf.services.ChannelzService;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.Date;
+import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
@@ -36,6 +42,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.config.Configuration;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.GrpcServer;
@@ -43,6 +50,8 @@ import org.apache.rocketmq.proxy.grpc.GrpcServerBuilder;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;
public class ProxyStartup {
@@ -58,9 +67,9 @@ public class ProxyStartup {
public static void main(String[] args) {
try {
- ConfigurationManager.initEnv();
- initLogger();
- ConfigurationManager.intConfig();
+ // parse argument from command line
+ CommandLineArgument commandLineArgument = parseCommandLineArgument(args);
+ initLogAndConfiguration(commandLineArgument);
// init thread pool monitor for proxy.
initThreadPoolMonitor();
@@ -100,7 +109,59 @@ public class ProxyStartup {
log.info(new Date() + " rocketmq-proxy startup successfully");
}
- private static MessagingProcessor createMessagingProcessor() {
+ protected static void initLogAndConfiguration(CommandLineArgument commandLineArgument) throws Exception {
+ if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) {
+ System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
+ }
+ ConfigurationManager.initEnv();
+ initLogger();
+ ConfigurationManager.intConfig();
+ setConfigFromCommandLineArgument(commandLineArgument);
+ }
+
+ protected static CommandLineArgument parseCommandLineArgument(String[] args) {
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args,
+ buildCommandlineOptions(), new DefaultParser());
+ if (commandLine == null) {
+ throw new RuntimeException("parse command line argument failed");
+ }
+
+ CommandLineArgument commandLineArgument = new CommandLineArgument();
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), commandLineArgument);
+ return commandLineArgument;
+ }
+
+ private static Options buildCommandlineOptions() {
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+ Option opt = new Option("bc", "brokerConfigPath", true, "Broker config file path for local mode");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("pc", "proxyConfigPath", true, "Proxy config file path");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("pm", "proxyMode", true, "Proxy run in local or cluster mode");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ private static void setConfigFromCommandLineArgument(CommandLineArgument commandLineArgument) {
+ if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) {
+ ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr());
+ }
+ if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) {
+ ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath());
+ }
+ if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) {
+ ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode());
+ }
+ }
+
+ protected static MessagingProcessor createMessagingProcessor() {
String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode();
MessagingProcessor messagingProcessor;
@@ -112,6 +173,12 @@ public class ProxyStartup {
@Override
public void start() throws Exception {
brokerController.start();
+ String tip = "The broker[" + brokerController.getBrokerConfig().getBrokerName() + ", "
+ + brokerController.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+ if (null != brokerController.getBrokerConfig().getNamesrvAddr()) {
+ tip += " and name server is " + brokerController.getBrokerConfig().getNamesrvAddr();
+ }
+ log.info(tip);
}
@Override
@@ -134,8 +201,14 @@ public class ProxyStartup {
return application;
}
- private static BrokerController createBrokerController() {
- String[] brokerStartupArgs = new String[] {"-c", ConfigurationManager.getProxyConfig().getBrokerConfigPath()};
+ protected static BrokerController createBrokerController() {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ List<String> brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath());
+ if (StringUtils.isNotBlank(config.getNamesrvAddr())) {
+ brokerStartupArgList.add("-n");
+ brokerStartupArgList.add(config.getNamesrvAddr());
+ }
+ String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]);
return BrokerStartup.createBrokerController(brokerStartupArgs);
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java
index 59078c712..9c1ff811b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/Configuration.java
@@ -26,6 +26,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,37 +34,38 @@ import org.slf4j.LoggerFactory;
public class Configuration {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final AtomicReference<ProxyConfig> proxyConfigReference = new AtomicReference<>();
+ public static final String CONFIG_PATH_PROPERTY = "com.rocketmq.proxy.configPath";
public void init() throws Exception {
- String proxyConfigData = loadJsonConfig(ProxyConfig.CONFIG_FILE_NAME);
- if (null == proxyConfigData) {
- throw new RuntimeException(String.format("load configuration from file: %s error.", ProxyConfig.CONFIG_FILE_NAME));
- }
+ String proxyConfigData = loadJsonConfig();
ProxyConfig proxyConfig = JSON.parseObject(proxyConfigData, ProxyConfig.class);
proxyConfig.initData();
setProxyConfig(proxyConfig);
}
- public static String loadJsonConfig(String configFileName) throws Exception {
- final String testResource = "rmq-proxy-home/conf/" + configFileName;
- try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) {
- if (null != inputStream) {
- return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
+ public static String loadJsonConfig() throws Exception {
+ String configFileName = ProxyConfig.DEFAULT_CONFIG_FILE_NAME;
+ String filePath = System.getProperty(CONFIG_PATH_PROPERTY);
+ if (StringUtils.isBlank(filePath)) {
+ final String testResource = "rmq-proxy-home/conf/" + configFileName;
+ try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) {
+ if (null != inputStream) {
+ return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
+ }
}
+ filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString();
}
- String filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString();
-
File file = new File(filePath);
if (!file.exists()) {
log.warn("the config file {} not exist", filePath);
- return null;
+ throw new RuntimeException(String.format("the config file %s not exist", filePath));
}
long fileLength = file.length();
if (fileLength <= 0) {
log.warn("the config file {} length is zero", filePath);
- return null;
+ throw new RuntimeException(String.format("the config file %s length is zero", filePath));
}
return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 5a605f28b..00a6cc35e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.proxy.ProxyMode;
import org.slf4j.Logger;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
public class ProxyConfig implements ConfigFile {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- public final static String CONFIG_FILE_NAME = "rmq-proxy.json";
+ public final static String DEFAULT_CONFIG_FILE_NAME = "rmq-proxy.json";
private static final int PROCESSOR_NUMBER = Runtime.getRuntime().availableProcessors();
private String rocketMQClusterName = "";
@@ -44,9 +45,9 @@ public class ProxyConfig implements ConfigFile {
private long printJstackInMillis = Duration.ofSeconds(60).toMillis();
private long printThreadPoolStatusInMillis = Duration.ofSeconds(3).toMillis();
- private String nameSrvAddr = "";
- private String nameSrvDomain = "";
- private String nameSrvDomainSubgroup = "";
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ private String namesrvDomain = "";
+ private String namesrvDomainSubgroup = "";
/**
* gRPC
*/
@@ -234,28 +235,28 @@ public class ProxyConfig implements ConfigFile {
this.printThreadPoolStatusInMillis = printThreadPoolStatusInMillis;
}
- public String getNameSrvAddr() {
- return nameSrvAddr;
+ public String getNamesrvAddr() {
+ return namesrvAddr;
}
- public void setNameSrvAddr(String nameSrvAddr) {
- this.nameSrvAddr = nameSrvAddr;
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
}
- public String getNameSrvDomain() {
- return nameSrvDomain;
+ public String getNamesrvDomain() {
+ return namesrvDomain;
}
- public void setNameSrvDomain(String nameSrvDomain) {
- this.nameSrvDomain = nameSrvDomain;
+ public void setNamesrvDomain(String namesrvDomain) {
+ this.namesrvDomain = namesrvDomain;
}
- public String getNameSrvDomainSubgroup() {
- return nameSrvDomainSubgroup;
+ public String getNamesrvDomainSubgroup() {
+ return namesrvDomainSubgroup;
}
- public void setNameSrvDomainSubgroup(String nameSrvDomainSubgroup) {
- this.nameSrvDomainSubgroup = nameSrvDomainSubgroup;
+ public void setNamesrvDomainSubgroup(String namesrvDomainSubgroup) {
+ this.namesrvDomainSubgroup = namesrvDomainSubgroup;
}
public String getProxyMode() {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java
index 0b813ae60..9d7db6cf7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIFactory.java
@@ -54,11 +54,11 @@ public class MQClientAPIFactory implements StartAndShutdown {
protected void init() {
System.setProperty(ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false");
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- if (StringUtils.isEmpty(proxyConfig.getNameSrvDomain())) {
- System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, proxyConfig.getNameSrvAddr());
+ if (StringUtils.isEmpty(proxyConfig.getNamesrvDomain())) {
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, proxyConfig.getNamesrvAddr());
} else {
- System.setProperty("rocketmq.namesrv.domain", proxyConfig.getNameSrvDomain());
- System.setProperty("rocketmq.namesrv.domain.subgroup", proxyConfig.getNameSrvDomainSubgroup());
+ System.setProperty("rocketmq.namesrv.domain", proxyConfig.getNamesrvDomain());
+ System.setProperty("rocketmq.namesrv.domain.subgroup", proxyConfig.getNamesrvDomainSubgroup());
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java
new file mode 100644
index 000000000..6adf7f3fb
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy;
+
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.proxy.config.Configuration;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
+import org.assertj.core.util.Strings;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+
+import static org.apache.rocketmq.proxy.config.ConfigurationManager.RMQ_PROXY_HOME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+
+public class ProxyStartupTest {
+
+ public static String mockProxyHome = "/mock/rmq/proxy/home";
+
+ @Before
+ public void before() throws Throwable {
+ URL mockProxyHomeURL = getClass().getClassLoader().getResource("rmq-proxy-home");
+ if (mockProxyHomeURL != null) {
+ mockProxyHome = mockProxyHomeURL.toURI().getPath();
+ }
+
+ if (!Strings.isNullOrEmpty(mockProxyHome)) {
+ System.setProperty(RMQ_PROXY_HOME, mockProxyHome);
+ }
+ }
+
+ @After
+ public void after() {
+ System.clearProperty(RMQ_PROXY_HOME);
+ System.clearProperty(MixAll.NAMESRV_ADDR_PROPERTY);
+ System.clearProperty(Configuration.CONFIG_PATH_PROPERTY);
+ System.clearProperty(ClientLogger.CLIENT_LOG_USESLF4J);
+ }
+
+ @Test
+ public void testParseAndInitCommandLineArgument() throws Exception {
+ Path configFilePath = Files.createTempFile("testParseAndInitCommandLineArgument", ".json");
+ String configData = "{}";
+ Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+
+ String brokerConfigPath = "brokerConfigPath";
+ String proxyConfigPath = configFilePath.toAbsolutePath().toString();
+ String proxyMode = "LOCAL";
+ String namesrvAddr = "namesrvAddr";
+ CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+ "-bc", brokerConfigPath,
+ "-pc", proxyConfigPath,
+ "-pm", proxyMode,
+ "-n", namesrvAddr
+ });
+
+ assertEquals(brokerConfigPath, commandLineArgument.getBrokerConfigPath());
+ assertEquals(proxyConfigPath, commandLineArgument.getProxyConfigPath());
+ assertEquals(proxyMode, commandLineArgument.getProxyMode());
+ assertEquals(namesrvAddr, commandLineArgument.getNamesrvAddr());
+
+ ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ assertEquals(brokerConfigPath, config.getBrokerConfigPath());
+ assertEquals(proxyMode, config.getProxyMode());
+ assertEquals(namesrvAddr, config.getNamesrvAddr());
+ }
+
+ @Test
+ public void testLocalModeWithNameSrvAddrByProperty() throws Exception {
+ String namesrvAddr = "namesrvAddr";
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
+ CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+ "-pm", "local"
+ });
+ ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ assertEquals(namesrvAddr, config.getNamesrvAddr());
+
+ validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+ }
+
+ private void validateBrokerCreateArgsWithNamsrvAddr(ProxyConfig config, String namesrvAddr) {
+ try (MockedStatic<BrokerStartup> brokerStartupMocked = mockStatic(BrokerStartup.class);
+ MockedStatic<DefaultMessagingProcessor> messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) {
+ ArgumentCaptor<Object> args = ArgumentCaptor.forClass(Object.class);
+ brokerStartupMocked.when(() -> BrokerStartup.createBrokerController((String[]) args.capture()))
+ .thenReturn(mock(BrokerController.class));
+ messagingProcessorMocked.when(() -> DefaultMessagingProcessor.createForLocalMode(any(), any()))
+ .thenReturn(mock(DefaultMessagingProcessor.class));
+
+ ProxyStartup.createMessagingProcessor();
+ String[] passArgs = (String[]) args.getValue();
+ assertEquals("-c", passArgs[0]);
+ assertEquals(config.getBrokerConfigPath(), passArgs[1]);
+ assertEquals("-n", passArgs[2]);
+ assertEquals(namesrvAddr, passArgs[3]);
+ assertEquals(4, passArgs.length);
+ }
+ }
+
+ @Test
+ public void testLocalModeWithNameSrvAddrByConfigFile() throws Exception {
+ String namesrvAddr = "namesrvAddr";
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo");
+ Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByConfigFile", ".json");
+ String configData = "{\n" +
+ " \"namesrvAddr\": \"namesrvAddr\"\n" +
+ "}";
+ Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+
+ CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+ "-pm", "local",
+ "-pc", configFilePath.toAbsolutePath().toString()
+ });
+ ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ assertEquals(namesrvAddr, config.getNamesrvAddr());
+
+ validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+ }
+
+ @Test
+ public void testLocalModeWithNameSrvAddrByCommandLine() throws Exception {
+ String namesrvAddr = "namesrvAddr";
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo");
+ Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByCommandLine", ".json");
+ String configData = "{\n" +
+ " \"namesrvAddr\": \"foo\"\n" +
+ "}";
+ Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+
+ CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+ "-pm", "local",
+ "-pc", configFilePath.toAbsolutePath().toString(),
+ "-n", namesrvAddr
+ });
+ ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ assertEquals(namesrvAddr, config.getNamesrvAddr());
+
+ validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+ }
+
+ @Test
+ public void testLocalModeWithAllArgs() throws Exception {
+ String namesrvAddr = "namesrvAddr";
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo");
+ Path configFilePath = Files.createTempFile("testLocalMode", ".json");
+ String configData = "{\n" +
+ " \"namesrvAddr\": \"foo\"\n" +
+ "}";
+ Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8));
+ Path brokerConfigFilePath = Files.createTempFile("testLocalModeBrokerConfig", ".json");
+
+ CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+ "-pm", "local",
+ "-pc", configFilePath.toAbsolutePath().toString(),
+ "-n", namesrvAddr,
+ "-bc", brokerConfigFilePath.toAbsolutePath().toString()
+ });
+ ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ assertEquals(namesrvAddr, config.getNamesrvAddr());
+ assertEquals(brokerConfigFilePath.toAbsolutePath().toString(), config.getBrokerConfigPath());
+
+ validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr);
+ }
+
+ @Test
+ public void testClusterMode() throws Exception {
+ CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] {
+ "-pm", "cluster"
+ });
+ ProxyStartup.initLogAndConfiguration(commandLineArgument);
+
+ try (MockedStatic<DefaultMessagingProcessor> messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) {
+ DefaultMessagingProcessor processor = mock(DefaultMessagingProcessor.class);
+ messagingProcessorMocked.when(DefaultMessagingProcessor::createForClusterMode)
+ .thenReturn(processor);
+
+ assertSame(processor, ProxyStartup.createMessagingProcessor());
+ }
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index e3f6edb99..e46464a6d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -114,7 +114,7 @@ public class LocalMessageServiceTest extends InitConfigAndLoggerTest {
@Before
public void setUp() throws Throwable {
super.before();
- ConfigurationManager.getProxyConfig().setNameSrvAddr("1.1.1.1");
+ ConfigurationManager.getProxyConfig().setNamesrvAddr("1.1.1.1");
channelManager = new ChannelManager();
Mockito.when(brokerControllerMock.getSendMessageProcessor()).thenReturn(sendMessageProcessorMock);
Mockito.when(brokerControllerMock.getPopMessageProcessor()).thenReturn(popMessageProcessorMock);
diff --git a/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 000000000..ca6ee9cea
--- /dev/null
+++ b/proxy/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index d88b6b1c0..586149cd1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -152,7 +152,7 @@ public class GrpcBaseIT extends BaseConf {
ConfigurationManager.initEnv();
ConfigurationManager.intConfig();
- ConfigurationManager.getProxyConfig().setNameSrvAddr(nsAddr);
+ ConfigurationManager.getProxyConfig().setNamesrvAddr(nsAddr);
// Set LongPollingReserveTimeInMillis to 500ms to reserve more time for IT
ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500);
ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());