You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/03/14 06:06:01 UTC
[rocketmq] branch 5.0.0-beta-tmp updated: feature(container&remoting):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3976)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta-tmp
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-tmp by this push:
new a48d286 feature(container&remoting):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3976)
a48d286 is described below
commit a48d2869b89d86d6b9c267cf7a250b43f693047a
Author: rongtong <ji...@163.com>
AuthorDate: Mon Mar 14 14:05:53 2022 +0800
feature(container&remoting):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3976)
Co-authored-by: rongtong.jrt <ro...@alibaba-inc.com>
---
container/pom.xml | 36 ++
.../apache/rocketmq/container/BrokerBootHook.java | 48 ++
.../apache/rocketmq/container/BrokerContainer.java | 435 +++++++++++++++++++
.../rocketmq/container/BrokerContainerConfig.java | 84 ++++
.../container/BrokerContainerProcessor.java | 273 ++++++++++++
.../rocketmq/container/BrokerContainerStartup.java | 482 +++++++++++++++++++++
.../rocketmq/container/BrokerPreOnlineService.java | 277 ++++++++++++
.../ContainerClientHouseKeepingService.java | 104 +++++
.../rocketmq/container/IBrokerContainer.java | 142 ++++++
.../rocketmq/container/InnerBrokerController.java | 378 ++++++++++++++++
.../container/InnerSalveBrokerController.java | 160 +++++++
.../logback/BrokerLogbackConfigurator.java | 187 ++++++++
.../container/BrokerContainerStartupTest.java | 151 +++++++
.../rocketmq/container/BrokerContainerTest.java | 339 +++++++++++++++
.../rocketmq/container/BrokerPreOnlineTest.java | 102 +++++
pom.xml | 20 +-
.../java/org/apache/rocketmq/remoting/RPCHook.java | 4 +-
.../apache/rocketmq/remoting/RemotingClient.java | 13 +-
.../apache/rocketmq/remoting/RemotingServer.java | 6 +
.../apache/rocketmq/remoting/RemotingService.java | 5 +
.../rocketmq/remoting/common/RemotingHelper.java | 38 ++
.../rocketmq/remoting/common/RemotingUtil.java | 18 +-
.../rocketmq/remoting/netty/NettyClientConfig.java | 38 ++
.../remoting/netty/NettyRemotingAbstract.java | 115 +++--
.../remoting/netty/NettyRemotingClient.java | 396 +++++++++++++----
.../remoting/netty/NettyRemotingServer.java | 158 ++++++-
.../remoting/netty/NettyRequestProcessor.java | 1 -
.../rocketmq/remoting/netty/ResponseFuture.java | 42 +-
.../remoting/protocol/RemotingCommand.java | 8 +-
.../rocketmq/remoting/RemotingServerTest.java | 5 +-
.../rocketmq/remoting/SubRemotingServerTest.java | 109 +++++
.../remoting/netty/NettyRemotingAbstractTest.java | 6 +-
.../remoting/protocol/RemotingCommandTest.java | 1 +
33 files changed, 3995 insertions(+), 186 deletions(-)
diff --git a/container/pom.xml b/container/pom.xml
new file mode 100644
index 0000000..a7f13d1
--- /dev/null
+++ b/container/pom.xml
@@ -0,0 +1,36 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>5.0.0-BETA-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-container</artifactId>
+ <name>rocketmq-container ${project.version}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-broker</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerBootHook.java b/container/src/main/java/org/apache/rocketmq/container/BrokerBootHook.java
new file mode 100644
index 0000000..fe126af
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerBootHook.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.util.Properties;
+import org.apache.rocketmq.broker.BrokerController;
+
+public interface BrokerBootHook {
+ /**
+ * Name of the hook.
+ *
+ * @return name of the hook
+ */
+ String hookName();
+
+ /**
+ * Code to execute before broker start.
+ *
+ * @param brokerController broker to start
+ * @param properties broker properties
+ * @throws Exception when execute hook
+ */
+ void executeBeforeStart(BrokerController brokerController, Properties properties) throws Exception;
+
+ /**
+ * Code to execute after broker start.
+ *
+ * @param brokerController broker to start
+ * @param properties broker properties
+ * @throws Exception when execute hook
+ */
+ void executeAfterStart(BrokerController brokerController, Properties properties) throws Exception;
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
new file mode 100644
index 0000000..50b8bc4
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.container;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.container.logback.BrokerLogbackConfigurator;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class BrokerContainer implements IBrokerContainer {
+ private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ new BasicThreadFactory.Builder()
+ .namingPattern("BrokerContainerScheduledThread")
+ .daemon(true)
+ .build());
+ private final NettyServerConfig nettyServerConfig;
+ private final NettyClientConfig nettyClientConfig;
+ private final BrokerOuterAPI brokerOuterAPI;
+ private final ContainerClientHouseKeepingService containerClientHouseKeepingService;
+
+ private final ConcurrentMap<BrokerIdentity, InnerSalveBrokerController> slaveBrokerControllers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<BrokerIdentity, InnerBrokerController> masterBrokerControllers = new ConcurrentHashMap<>();
+ private final List<BrokerBootHook> brokerBootHookList = new ArrayList<>();
+ private final BrokerContainerProcessor brokerContainerProcessor;
+ private final Configuration configuration;
+ private final BrokerContainerConfig brokerContainerConfig;
+
+ private RemotingServer remotingServer;
+ private RemotingServer fastRemotingServer;
+ private ExecutorService brokerContainerExecutor;
+
+ public BrokerContainer(
+ final BrokerContainerConfig brokerContainerConfig,
+ final NettyServerConfig nettyServerConfig,
+ final NettyClientConfig nettyClientConfig
+ ) {
+ this.brokerContainerConfig = brokerContainerConfig;
+ this.nettyServerConfig = nettyServerConfig;
+ this.nettyClientConfig = nettyClientConfig;
+
+ this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+
+ this.brokerContainerProcessor = new BrokerContainerProcessor(this);
+ this.brokerContainerProcessor.registerBrokerBootHook(this.brokerBootHookList);
+ this.containerClientHouseKeepingService = new ContainerClientHouseKeepingService(this);
+
+ this.configuration = new Configuration(
+ LOG,
+ BrokerPathConfigHelper.getBrokerConfigPath(),
+ this.brokerContainerConfig, this.nettyServerConfig, this.nettyClientConfig);
+ }
+
+ @Override
+ public String getBrokerContainerAddr() {
+ return this.brokerContainerConfig.getBrokerContainerIP() + ":" + this.nettyServerConfig.getListenPort();
+ }
+
+ @Override
+ public BrokerContainerConfig getBrokerContainerConfig() {
+ return brokerContainerConfig;
+ }
+
+ @Override
+ public NettyServerConfig getNettyServerConfig() {
+ return nettyServerConfig;
+ }
+
+ public NettyClientConfig getNettyClientConfig() {
+ return nettyClientConfig;
+ }
+
+ @Override
+ public BrokerOuterAPI getBrokerOuterAPI() {
+ return brokerOuterAPI;
+ }
+
+ @Override
+ public RemotingServer getRemotingServer() {
+ return remotingServer;
+ }
+
+ public Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ public boolean initialize() {
+ this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.containerClientHouseKeepingService);
+ this.fastRemotingServer = this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 2);
+
+ this.brokerContainerExecutor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(10000),
+ new ThreadFactoryImpl("SharedBrokerThread_"));
+
+ this.registerProcessor();
+
+ if (this.brokerContainerConfig.getNamesrvAddr() != null) {
+ this.brokerOuterAPI.updateNameServerAddressList(this.brokerContainerConfig.getNamesrvAddr());
+ LOG.info("Set user specified name server address: {}", this.brokerContainerConfig.getNamesrvAddr());
+ // also auto update namesrv if specify
+ this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+ @Override
+ public void run2() {
+ try {
+ BrokerContainer.this.brokerOuterAPI.updateNameServerAddressList(BrokerContainer.this.brokerContainerConfig.getNamesrvAddr());
+ } catch (Throwable e) {
+ LOG.error("ScheduledTask fetchNameServerAddr exception", e);
+ }
+ }
+ }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+ } else if (this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+
+ @Override
+ public void run2() {
+ try {
+ BrokerContainer.this.brokerOuterAPI.fetchNameServerAddr();
+ } catch (Throwable e) {
+ LOG.error("ScheduledTask fetchNameServerAddr exception", e);
+ }
+ }
+ }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+ }
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+ @Override
+ public void run2() {
+ try {
+ BrokerContainer.this.brokerOuterAPI.refreshMetadata();
+ } catch (Exception e) {
+ LOG.error("ScheduledTask refresh metadata exception", e);
+ }
+ }
+ }, 1, 5, TimeUnit.SECONDS);
+
+ return true;
+ }
+
+ private void registerProcessor() {
+ remotingServer.registerDefaultProcessor(brokerContainerProcessor, this.brokerContainerExecutor);
+ fastRemotingServer.registerDefaultProcessor(brokerContainerProcessor, this.brokerContainerExecutor);
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (this.remotingServer != null) {
+ this.remotingServer.start();
+ }
+
+ if (this.fastRemotingServer != null) {
+ this.fastRemotingServer.start();
+ }
+
+ if (this.brokerOuterAPI != null) {
+ this.brokerOuterAPI.start();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // Shutdown slave brokers
+ for (InnerSalveBrokerController slaveBrokerController : slaveBrokerControllers.values()) {
+ slaveBrokerController.shutdown();
+ }
+
+ slaveBrokerControllers.clear();
+
+ // Shutdown master brokers
+ for (BrokerController masterBrokerController : masterBrokerControllers.values()) {
+ masterBrokerController.shutdown();
+ }
+
+ masterBrokerControllers.clear();
+
+ // Shutdown the remoting server with a high priority to avoid further traffic
+ if (this.remotingServer != null) {
+ this.remotingServer.shutdown();
+ }
+
+ if (this.fastRemotingServer != null) {
+ this.fastRemotingServer.shutdown();
+ }
+
+ // Shutdown the request executors
+ ThreadUtils.shutdown(this.brokerContainerExecutor);
+
+ if (this.brokerOuterAPI != null) {
+ this.brokerOuterAPI.shutdown();
+ }
+ }
+
+ public void registerClientRPCHook(RPCHook rpcHook) {
+ this.getBrokerOuterAPI().registerRPCHook(rpcHook);
+ }
+
+ public void clearClientRPCHook() {
+ this.getBrokerOuterAPI().clearRPCHook();
+ }
+
+ public List<BrokerBootHook> getBrokerBootHookList() {
+ return brokerBootHookList;
+ }
+
+ public void registerBrokerBootHook(BrokerBootHook brokerBootHook) {
+ this.brokerBootHookList.add(brokerBootHook);
+ LOG.info("register BrokerBootHook, {}", brokerBootHook.hookName());
+ }
+
+ @Override
+ public InnerBrokerController addBroker(final BrokerConfig brokerConfig,
+ final MessageStoreConfig storeConfig) throws Exception {
+ if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
+ return this.addMasterBroker(brokerConfig, storeConfig);
+ }
+ if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
+ return this.addSlaveBroker(brokerConfig, storeConfig);
+ }
+
+ return null;
+ }
+
+ public InnerBrokerController addMasterBroker(final BrokerConfig masterBrokerConfig,
+ final MessageStoreConfig storeConfig) throws Exception {
+
+ masterBrokerConfig.setInBrokerContainer(true);
+ if (storeConfig.isDuplicationEnable()) {
+ LOG.error("Can not add broker to container when duplicationEnable is true currently");
+ throw new Exception("Can not add broker to container when duplicationEnable is true currently");
+ }
+ InnerBrokerController masterBroker = new InnerBrokerController(this, masterBrokerConfig, storeConfig);
+ BrokerIdentity brokerIdentity = new BrokerIdentity(masterBrokerConfig.getBrokerClusterName(),
+ masterBrokerConfig.getBrokerName(), masterBrokerConfig.getBrokerId());
+ final BrokerController previousBroker = masterBrokerControllers.putIfAbsent(brokerIdentity, masterBroker);
+ if (previousBroker == null) {
+ // New master broker added, start it
+ try {
+ BrokerLogbackConfigurator.doConfigure(masterBrokerConfig);
+ final boolean initResult = masterBroker.initialize();
+ if (!initResult) {
+ masterBroker.shutdown();
+ masterBrokerControllers.remove(brokerIdentity);
+ throw new Exception("Failed to init master broker " + masterBrokerConfig.getCanonicalName());
+ }
+
+ for (InnerSalveBrokerController slaveBroker : this.getSlaveBrokers()) {
+ if (slaveBroker.getMessageStore().getMasterStoreInProcess() == null) {
+ slaveBroker.getMessageStore().setMasterStoreInProcess(masterBroker.getMessageStore());
+ }
+ }
+ } catch (Exception e) {
+ // Remove the failed master broker and throw the exception
+ masterBroker.shutdown();
+ masterBrokerControllers.remove(brokerIdentity);
+ throw new Exception("Failed to initialize master broker " + masterBrokerConfig.getCanonicalName(), e);
+ }
+ return masterBroker;
+ }
+ throw new Exception(masterBrokerConfig.getCanonicalName() + " has already been added to current broker");
+ }
+
+ /**
+ * This function will create a slave broker along with the main broker, and start it with a different port.
+ *
+ * @param slaveBrokerConfig the specific slave broker config
+ * @throws Exception is thrown if an error occurs
+ */
+ public InnerSalveBrokerController addSlaveBroker(final BrokerConfig slaveBrokerConfig,
+ final MessageStoreConfig storeConfig) throws Exception {
+
+ slaveBrokerConfig.setInBrokerContainer(true);
+ if (storeConfig.isDuplicationEnable()) {
+ LOG.error("Can not add broker to container when duplicationEnable is true currently");
+ throw new Exception("Can not add broker to container when duplicationEnable is true currently");
+ }
+
+ int ratio = storeConfig.getAccessMessageInMemoryMaxRatio() - 10;
+ storeConfig.setAccessMessageInMemoryMaxRatio(Math.max(ratio, 0));
+ InnerSalveBrokerController slaveBroker = new InnerSalveBrokerController(this, slaveBrokerConfig, storeConfig);
+ BrokerIdentity brokerIdentity = new BrokerIdentity(slaveBrokerConfig.getBrokerClusterName(),
+ slaveBrokerConfig.getBrokerName(), slaveBrokerConfig.getBrokerId());
+ final InnerSalveBrokerController previousBroker = slaveBrokerControllers.putIfAbsent(brokerIdentity, slaveBroker);
+ if (previousBroker == null) {
+ // New slave broker added, start it
+ try {
+ BrokerLogbackConfigurator.doConfigure(slaveBrokerConfig);
+ final boolean initResult = slaveBroker.initialize();
+ if (!initResult) {
+ slaveBroker.shutdown();
+ slaveBrokerControllers.remove(brokerIdentity);
+ throw new Exception("Failed to init slave broker " + slaveBrokerConfig.getCanonicalName());
+ }
+ BrokerController masterBroker = this.peekMasterBroker();
+ if (slaveBroker.getMessageStore().getMasterStoreInProcess() == null && masterBroker != null) {
+ slaveBroker.getMessageStore().setMasterStoreInProcess(masterBroker.getMessageStore());
+ }
+ } catch (Exception e) {
+ // Remove the failed slave broker and throw the exception
+ slaveBroker.shutdown();
+ slaveBrokerControllers.remove(brokerIdentity);
+ throw new Exception("Failed to initialize slave broker " + slaveBrokerConfig.getCanonicalName(), e);
+ }
+ return slaveBroker;
+ }
+ throw new Exception(slaveBrokerConfig.getCanonicalName() + " has already been added to current broker");
+ }
+
+ @Override
+ public BrokerController removeBroker(final BrokerIdentity brokerIdentity) throws Exception {
+
+ InnerSalveBrokerController slaveBroker = slaveBrokerControllers.remove(brokerIdentity);
+ if (slaveBroker != null) {
+ slaveBroker.shutdown();
+ return slaveBroker;
+ }
+
+ BrokerController masterBroker = masterBrokerControllers.remove(brokerIdentity);
+
+ BrokerController nextMasterBroker = this.peekMasterBroker();
+ for (InnerSalveBrokerController slave : this.getSlaveBrokers()) {
+ if (nextMasterBroker == null) {
+ slave.getMessageStore().setMasterStoreInProcess(null);
+ } else {
+ slave.getMessageStore().setMasterStoreInProcess(nextMasterBroker.getMessageStore());
+ }
+
+ }
+
+ if (masterBroker != null) {
+ masterBroker.shutdown();
+ return masterBroker;
+ }
+
+ return null;
+ }
+
+ @Override
+ public BrokerController getBroker(final BrokerIdentity brokerIdentity) {
+ InnerSalveBrokerController slaveBroker = slaveBrokerControllers.get(brokerIdentity);
+ if (slaveBroker != null) {
+ return slaveBroker;
+ }
+
+ return masterBrokerControllers.get(brokerIdentity);
+ }
+
+ @Override
+ public Collection<InnerBrokerController> getMasterBrokers() {
+ return masterBrokerControllers.values();
+ }
+
+ @Override
+ public Collection<InnerSalveBrokerController> getSlaveBrokers() {
+ return slaveBrokerControllers.values();
+ }
+
+ @Override
+ public List<BrokerController> getBrokerControllers() {
+ List<BrokerController> brokerControllers = new ArrayList<>();
+ brokerControllers.addAll(this.getMasterBrokers());
+ brokerControllers.addAll(this.getSlaveBrokers());
+ return brokerControllers;
+ }
+
+ @Override
+ public BrokerController peekMasterBroker() {
+ if (!masterBrokerControllers.isEmpty()) {
+ return masterBrokerControllers.values().iterator().next();
+ }
+ return null;
+ }
+
+ public BrokerController findBrokerControllerByBrokerName(String brokerName) {
+ for (BrokerController brokerController : masterBrokerControllers.values()) {
+ if (brokerController.getBrokerConfig().getBrokerName().equals(brokerName)) {
+ return brokerController;
+ }
+ }
+
+ for (BrokerController brokerController : slaveBrokerControllers.values()) {
+ if (brokerController.getBrokerConfig().getBrokerName().equals(brokerName)) {
+ return brokerController;
+ }
+ }
+ return null;
+ }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
new file mode 100644
index 0000000..28a5242
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class BrokerContainerConfig {
+
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+ @ImportantField
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+ @ImportantField
+ private boolean fetchNamesrvAddrByAddressServer = false;
+
+ @ImportantField
+ private String brokerContainerIP = RemotingUtil.getLocalAddress();
+
+ private String brokerConfigPaths = null;
+
+ private boolean compatibleWithOldNameSrv = true;
+
+ public String getRocketmqHome() {
+ return rocketmqHome;
+ }
+
+ public void setRocketmqHome(String rocketmqHome) {
+ this.rocketmqHome = rocketmqHome;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public boolean isFetchNamesrvAddrByAddressServer() {
+ return fetchNamesrvAddrByAddressServer;
+ }
+
+ public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
+ this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
+ }
+
+ public String getBrokerContainerIP() {
+ return brokerContainerIP;
+ }
+
+ public String getBrokerConfigPaths() {
+ return brokerConfigPaths;
+ }
+
+ public void setBrokerConfigPaths(String brokerConfigPaths) {
+ this.brokerConfigPaths = brokerConfigPaths;
+ }
+
+ public boolean isCompatibleWithOldNameSrv() {
+ return compatibleWithOldNameSrv;
+ }
+
+ public void setCompatibleWithOldNameSrv(boolean compatibleWithOldNameSrv) {
+ this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
+ }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
new file mode 100644
index 0000000..6893882
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import io.netty.channel.ChannelHandlerContext;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.AddBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.RemoveBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class BrokerContainerProcessor implements NettyRequestProcessor {
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerContainer brokerContainer;
+ private List<BrokerBootHook> brokerBootHookList;
+
+ public BrokerContainerProcessor(BrokerContainer brokerContainer) {
+ this.brokerContainer = brokerContainer;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ switch (request.getCode()) {
+ case RequestCode.ADD_BROKER:
+ return this.addBroker(ctx, request);
+ case RequestCode.REMOVE_BROKER:
+ return this.removeBroker(ctx, request);
+ case RequestCode.GET_BROKER_CONFIG:
+ return this.getBrokerConfig(ctx, request);
+ case RequestCode.UPDATE_BROKER_CONFIG:
+ return this.updateBrokerConfig(ctx, request);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ @Override public boolean rejectRequest() {
+ return false;
+ }
+
+ private synchronized RemotingCommand addBroker(ChannelHandlerContext ctx,
+ RemotingCommand request) throws Exception {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final AddBrokerRequestHeader requestHeader = request.decodeCommandCustomHeader(AddBrokerRequestHeader.class);
+
+ LOGGER.info("addBroker called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ Properties brokerProperties = null;
+ String configPath = requestHeader.getConfigPath();
+
+ if (configPath != null && !configPath.isEmpty()) {
+ BrokerStartup.SystemConfigFileHelper configFileHelper = new BrokerStartup.SystemConfigFileHelper();
+ configFileHelper.setFile(configPath);
+
+ try {
+ brokerProperties = configFileHelper.loadConfig();
+ } catch (Exception e) {
+ LOGGER.error("addBroker load config from {} failed, {}", configPath, e);
+ }
+ } else {
+ byte[] body = request.getBody();
+ if (body != null) {
+ String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+ brokerProperties = MixAll.string2Properties(bodyStr);
+ }
+ }
+
+ if (brokerProperties == null) {
+ LOGGER.error("addBroker properties empty");
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("addBroker properties empty");
+ return response;
+ }
+
+ BrokerConfig brokerConfig = new BrokerConfig();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ MixAll.properties2Object(brokerProperties, brokerConfig);
+ MixAll.properties2Object(brokerProperties, messageStoreConfig);
+
+ messageStoreConfig.setHaListenPort(brokerConfig.getListenPort() + 1);
+
+ if (configPath != null && !configPath.isEmpty()) {
+ brokerConfig.setBrokerConfigPath(configPath);
+ }
+
+ switch (messageStoreConfig.getBrokerRole()) {
+ case ASYNC_MASTER:
+ case SYNC_MASTER:
+ brokerConfig.setBrokerId(MixAll.MASTER_ID);
+ break;
+ case SLAVE:
+ if (brokerConfig.getBrokerId() <= 0) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("slave broker id must be > 0");
+ return response;
+ }
+ break;
+ default:
+ break;
+
+ }
+
+ if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
+ || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
+ || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("invalid replicas number");
+ return response;
+ }
+
+ BrokerController brokerController;
+ try {
+ brokerController = this.brokerContainer.addBroker(brokerConfig, messageStoreConfig);
+ } catch (Exception e) {
+ LOGGER.error("addBroker exception {}", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+ if (brokerController != null) {
+ brokerController.getConfiguration().registerConfig(brokerProperties);
+ try {
+ for (BrokerBootHook brokerBootHook : brokerBootHookList) {
+ brokerBootHook.executeBeforeStart(brokerController, brokerProperties);
+ }
+ brokerController.start();
+
+ for (BrokerBootHook brokerBootHook : brokerBootHookList) {
+ brokerBootHook.executeAfterStart(brokerController, brokerProperties);
+ }
+ } catch (Exception e) {
+ LOGGER.error("start broker exception {}", e);
+ BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+ brokerConfig.getBrokerName(),
+ brokerConfig.getBrokerId());
+ this.brokerContainer.removeBroker(brokerIdentity);
+ brokerController.shutdown();
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("start broker failed, " + e);
+ return response;
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("add broker return null");
+ }
+
+ return response;
+ }
+
+ private synchronized RemotingCommand removeBroker(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final RemoveBrokerRequestHeader requestHeader = request.decodeCommandCustomHeader(RemoveBrokerRequestHeader.class);
+
+ LOGGER.info("removeBroker called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ BrokerIdentity brokerIdentity = new BrokerIdentity(requestHeader.getBrokerClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerId());
+
+ BrokerController brokerController;
+ try {
+ brokerController = this.brokerContainer.removeBroker(brokerIdentity);
+ } catch (Exception e) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+
+ if (brokerController != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ response.setCode(ResponseCode.BROKER_NOT_EXIST);
+ response.setRemark("Broker not exist");
+ }
+ return response;
+ }
+
+ public void registerBrokerBootHook(List<BrokerBootHook> brokerBootHookList) {
+ this.brokerBootHookList = brokerBootHookList;
+ }
+
+ private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ LOGGER.info("updateSharedBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ byte[] body = request.getBody();
+ if (body != null) {
+ try {
+ String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+ Properties properties = MixAll.string2Properties(bodyStr);
+ if (properties != null) {
+ LOGGER.info("updateSharedBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
+ this.brokerContainer.getConfiguration().update(properties);
+ } else {
+ LOGGER.error("string2Properties error");
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("string2Properties error");
+ return response;
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOGGER.error("", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
+ final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader();
+
+ String content = this.brokerContainer.getConfiguration().getAllConfigsFormatString();
+ if (content != null && content.length() > 0) {
+ try {
+ response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+ } catch (UnsupportedEncodingException e) {
+ LOGGER.error("", e);
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+ }
+
+ responseHeader.setVersion(this.brokerContainer.getConfiguration().getDataVersionJson());
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
new file mode 100644
index 0000000..f4f5d4a
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.container;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerContainerStartup {
+ private static final char BROKER_CONTAINER_CONFIG_OPTION = 'c';
+ private static final char BROKER_CONFIG_OPTION = 'b';
+ private static final char PRINT_PROPERTIES_OPTION = 'p';
+ private static final String PRINT_IMPORTANT_PROPERTIES_OPTION = "pm";
+ public static Properties properties = null;
+ public static CommandLine commandLine = null;
+ public static String configFile = null;
+ public static InternalLogger log;
+ public static SystemConfigFileHelper configFileHelper = new SystemConfigFileHelper();
+ public static String rocketmqHome = null;
+ public static JoranConfigurator configurator = new JoranConfigurator();
+
+ public static void main(String[] args) {
+ final BrokerContainer brokerContainer = startBrokerContainer(createBrokerContainer(args));
+ createAndStartBrokers(brokerContainer);
+ }
+
+ /**
+ * Start brokerController, for old version compatibility.
+ *
+ * @param controller brokerController
+ * @return brokerController
+ */
+ public static BrokerController start(BrokerController controller) {
+ try {
+
+ controller.start();
+
+ String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+ if (null != controller.getBrokerConfig().getNamesrvAddr()) {
+ tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
+ }
+
+ log.info(tip);
+ System.out.printf("%s%n", tip);
+ return controller;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return null;
+ }
+
+ public static BrokerController createBrokerController(String[] args) {
+ final BrokerContainer sharedBrokerController = startBrokerContainer(createBrokerContainer(args, true));
+ return createAndInitializeBroker(sharedBrokerController, configFile, properties);
+ }
+
+ public static List<BrokerController> createAndStartBrokers(BrokerContainer brokerContainer) {
+ String[] configPaths = parseBrokerConfigPath();
+ List<BrokerController> brokerControllerList = new ArrayList<>();
+
+ if (configPaths != null && configPaths.length > 0) {
+ SystemConfigFileHelper configFileHelper = new SystemConfigFileHelper();
+ for (String configPath : configPaths) {
+ System.out.printf("Start broker from config file path %s%n", configPath);
+ configFileHelper.setFile(configPath);
+
+ Properties brokerProperties = null;
+ try {
+ brokerProperties = configFileHelper.loadConfig();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ final BrokerController brokerController = createAndInitializeBroker(brokerContainer, configPath, brokerProperties);
+ if (brokerController != null) {
+ brokerControllerList.add(brokerController);
+ startBrokerController(brokerContainer, brokerController, brokerProperties);
+ }
+ }
+ }
+
+ return brokerControllerList;
+ }
+
+ public static String[] parseBrokerConfigPath() {
+ String brokerConfigList = null;
+ if (commandLine.hasOption(BROKER_CONFIG_OPTION)) {
+ brokerConfigList = commandLine.getOptionValue(BROKER_CONFIG_OPTION);
+
+ } else if (commandLine.hasOption(BROKER_CONTAINER_CONFIG_OPTION)) {
+ String brokerContainerConfigPath = commandLine.getOptionValue(BROKER_CONTAINER_CONFIG_OPTION);
+ if (brokerContainerConfigPath != null) {
+ BrokerContainerConfig brokerContainerConfig = new BrokerContainerConfig();
+ SystemConfigFileHelper configFileHelper = new SystemConfigFileHelper();
+ configFileHelper.setFile(brokerContainerConfigPath);
+ Properties brokerContainerProperties = null;
+ try {
+ brokerContainerProperties = configFileHelper.loadConfig();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ if (brokerContainerProperties != null) {
+ MixAll.properties2Object(brokerContainerProperties, brokerContainerConfig);
+ }
+ brokerConfigList = brokerContainerConfig.getBrokerConfigPaths();
+ }
+ }
+
+ if (brokerConfigList != null) {
+ return brokerConfigList.split(":");
+ }
+ return null;
+ }
+
+ public static BrokerController createAndInitializeBroker(BrokerContainer brokerContainer,
+ String filePath, Properties brokerProperties) {
+
+ final BrokerConfig brokerConfig = new BrokerConfig();
+ final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+ if (brokerProperties != null) {
+ properties2SystemEnv(brokerProperties);
+ MixAll.properties2Object(brokerProperties, brokerConfig);
+ MixAll.properties2Object(brokerProperties, messageStoreConfig);
+ }
+
+ messageStoreConfig.setHaListenPort(brokerConfig.getListenPort() + 1);
+
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+
+ switch (messageStoreConfig.getBrokerRole()) {
+ case ASYNC_MASTER:
+ case SYNC_MASTER:
+ brokerConfig.setBrokerId(MixAll.MASTER_ID);
+ break;
+ case SLAVE:
+ if (brokerConfig.getBrokerId() <= 0) {
+ System.out.printf("Slave's brokerId must be > 0%n");
+ System.exit(-3);
+ }
+
+ break;
+ default:
+ break;
+ }
+
+ if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
+ || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
+ || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
+ System.out.printf("invalid replicas number%n");
+ System.exit(-3);
+ }
+
+ brokerConfig.setBrokerConfigPath(filePath);
+
+ log = InternalLoggerFactory.getLogger(brokerConfig.getLoggerIdentifier() + LoggerName.BROKER_LOGGER_NAME);
+ MixAll.printObjectProperties(log, brokerConfig);
+ MixAll.printObjectProperties(log, messageStoreConfig);
+
+ try {
+ BrokerController brokerController = brokerContainer.addBroker(brokerConfig, messageStoreConfig);
+ if (brokerController != null) {
+ brokerController.getConfiguration().registerConfig(brokerProperties);
+ return brokerController;
+ } else {
+ System.out.printf("Add broker [%s-%s] failed.%n", brokerConfig.getBrokerName(), brokerConfig.getBrokerId());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ return null;
+ }
+
+ public static BrokerContainer startBrokerContainer(BrokerContainer brokerContainer) {
+ try {
+
+ brokerContainer.start();
+
+ String tip = "The broker container boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+ if (null != brokerContainer.getBrokerContainerConfig().getNamesrvAddr()) {
+ tip += " and name server is " + brokerContainer.getBrokerContainerConfig().getNamesrvAddr();
+ }
+
+ log.info(tip);
+ System.out.printf("%s%n", tip);
+ return brokerContainer;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return null;
+ }
+
+ public static void startBrokerController(BrokerContainer brokerContainer,
+ BrokerController brokerController, Properties brokerProperties) {
+ try {
+ for (BrokerBootHook hook : brokerContainer.getBrokerBootHookList()) {
+ hook.executeBeforeStart(brokerController, brokerProperties);
+ }
+
+ brokerController.start();
+
+ for (BrokerBootHook hook : brokerContainer.getBrokerBootHookList()) {
+ hook.executeAfterStart(brokerController, brokerProperties);
+ }
+
+ String tip = String.format("Broker [%s-%s] boot success. serializeType=%s",
+ brokerController.getBrokerConfig().getBrokerName(),
+ brokerController.getBrokerConfig().getBrokerId(),
+ RemotingCommand.getSerializeTypeConfigInThisServer());
+
+ log.info(tip);
+ System.out.printf("%s%n", tip);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+
+ public static void shutdown(final BrokerContainer controller) {
+ if (null != controller) {
+ controller.shutdown();
+ }
+ }
+
+ public static BrokerContainer createBrokerContainer(String[] args) {
+ return createBrokerContainer(args, false);
+ }
+
+ public static BrokerContainer createBrokerContainer(String[] args, boolean useDefaultPort) {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+ NettySystemConfig.socketSndbufSize = 131072;
+ }
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+ NettySystemConfig.socketRcvbufSize = 131072;
+ }
+
+ try {
+ //PackageConflictDetect.detectFastjson();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
+ new DefaultParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ }
+
+ final BrokerContainerConfig brokerConfig = new BrokerContainerConfig();
+ final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+
+ nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TlsSystemConfig.TLS_ENABLE,
+ String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
+ nettyServerConfig.setListenPort(10811);
+
+ if (commandLine.hasOption(BROKER_CONTAINER_CONFIG_OPTION)) {
+ String file = commandLine.getOptionValue(BROKER_CONTAINER_CONFIG_OPTION);
+ if (file != null) {
+ configFileHelper.setFile(file);
+ configFile = file;
+ BrokerPathConfigHelper.setBrokerConfigPath(file);
+ }
+ }
+
+ properties = configFileHelper.loadConfig();
+ if (properties != null) {
+ properties2SystemEnv(properties);
+ MixAll.properties2Object(properties, brokerConfig);
+ MixAll.properties2Object(properties, nettyServerConfig);
+ MixAll.properties2Object(properties, nettyClientConfig);
+ }
+
+ if (useDefaultPort) {
+ nettyServerConfig.setListenPort(10811);
+ }
+
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+
+ if (null == brokerConfig.getRocketmqHome()) {
+ System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
+ System.exit(-2);
+ }
+ rocketmqHome = brokerConfig.getRocketmqHome();
+
+ String namesrvAddr = brokerConfig.getNamesrvAddr();
+ if (null != namesrvAddr) {
+ try {
+ String[] addrArray = namesrvAddr.split(";");
+ for (String addr : addrArray) {
+ RemotingUtil.string2SocketAddress(addr);
+ }
+ } catch (Exception e) {
+ System.out.printf(
+ "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+ namesrvAddr);
+ System.exit(-3);
+ }
+ }
+
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ configurator.setContext(lc);
+ lc.reset();
+ //https://logback.qos.ch/manual/configuration.html
+ lc.setPackagingDataEnabled(false);
+
+ configurator.doConfigure(rocketmqHome + "/conf/logback_broker.xml");
+
+ if (commandLine.hasOption(PRINT_PROPERTIES_OPTION)) {
+ InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+ MixAll.printObjectProperties(console, brokerConfig);
+ MixAll.printObjectProperties(console, nettyServerConfig);
+ MixAll.printObjectProperties(console, nettyClientConfig);
+ System.exit(0);
+ } else if (commandLine.hasOption(PRINT_IMPORTANT_PROPERTIES_OPTION)) {
+ InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+ MixAll.printObjectProperties(console, brokerConfig, true);
+ MixAll.printObjectProperties(console, nettyServerConfig, true);
+ MixAll.printObjectProperties(console, nettyClientConfig, true);
+ System.exit(0);
+ }
+
+ log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ MixAll.printObjectProperties(log, brokerConfig);
+ MixAll.printObjectProperties(log, nettyServerConfig);
+ MixAll.printObjectProperties(log, nettyClientConfig);
+
+ final BrokerContainer brokerContainer = new BrokerContainer(
+ brokerConfig,
+ nettyServerConfig,
+ nettyClientConfig);
+ // remember all configs to prevent discard
+ brokerContainer.getConfiguration().registerConfig(properties);
+
+ boolean initResult = brokerContainer.initialize();
+ if (!initResult) {
+ brokerContainer.shutdown();
+ System.exit(-3);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ private volatile boolean hasShutdown = false;
+ private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
+ if (!this.hasShutdown) {
+ this.hasShutdown = true;
+ long beginTime = System.currentTimeMillis();
+ brokerContainer.shutdown();
+ long consumingTimeTotal = System.currentTimeMillis() - beginTime;
+ log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
+ }
+ }
+ }
+ }, "ShutdownHook"));
+
+ return brokerContainer;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return null;
+ }
+
+ private static void properties2SystemEnv(Properties properties) {
+ if (properties == null) {
+ return;
+ }
+ String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);
+ String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);
+ System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
+ System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
+ }
+
+ private static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("c", "configFile", true, "Config file for shared broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("p", "printConfigItem", false, "Print all config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "printImportantConfig", false, "Print important config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerConfigFiles", true, "The path of broker config files, split by ':'");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ public static class SystemConfigFileHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SystemConfigFileHelper.class);
+
+ private String file;
+
+ public SystemConfigFileHelper() {
+ }
+
+ public Properties loadConfig() throws Exception {
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ Properties properties = new Properties();
+ properties.load(in);
+ in.close();
+ return properties;
+ }
+
+ public void update(Properties properties) throws Exception {
+ LOGGER.error("[SystemConfigFileHelper] update no thing.");
+ }
+
+ public void setFile(String file) {
+ this.file = file;
+ }
+
+ public String getFile() {
+ return file;
+ }
+ }
+
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java b/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
new file mode 100644
index 0000000..40a0187
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
+import org.apache.rocketmq.common.BrokerSyncInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.broker.schedule.DelayOffsetSerializeWrapper;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.HAConnectionStateNotificationRequest;
+
+public class BrokerPreOnlineService extends ServiceThread {
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final InnerBrokerController brokerController;
+
+ private int waitBrokerIndex = 0;
+
+ public BrokerPreOnlineService(InnerBrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ @Override
+ public String getServiceName() {
+ if (this.brokerController != null && this.brokerController.getBrokerConfig().isInBrokerContainer()) {
+ return brokerController.getBrokerConfig().getLoggerIdentifier() + BrokerPreOnlineService.class.getSimpleName();
+ }
+ return BrokerPreOnlineService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ if (!this.brokerController.isIsolated()) {
+ LOGGER.info("broker {} is online", this.brokerController.getBrokerConfig().getCanonicalName());
+ break;
+ }
+ try {
+ boolean isSuccess = this.prepareForBrokerOnline();
+ if (!isSuccess) {
+ this.waitForRunning(1000);
+ } else {
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Broker preOnline error, ", e);
+ }
+ }
+
+ LOGGER.info(this.getServiceName() + " service end");
+ }
+
+ CompletableFuture<Boolean> waitForHaHandshakeComplete(String brokerAddr) {
+ LOGGER.info("wait for handshake completion with {}", brokerAddr);
+ HAConnectionStateNotificationRequest request =
+ new HAConnectionStateNotificationRequest(HAConnectionState.TRANSFER, RemotingHelper.parseHostFromAddress(brokerAddr), true);
+ if (this.brokerController.getMessageStore().getHaService() != null) {
+ this.brokerController.getMessageStore().getHaService().putGroupConnectionStateRequest(request);
+ } else {
+ LOGGER.error("HAService is null, maybe broker config is wrong. For example, duplicationEnable is true");
+ request.getRequestFuture().complete(false);
+ }
+ return request.getRequestFuture();
+ }
+
+ private boolean futureWaitAction(boolean result, BrokerMemberGroup brokerMemberGroup) {
+ if (!result) {
+ LOGGER.error("wait for handshake completion failed, HA connection lost");
+ return false;
+ }
+ if (this.brokerController.getBrokerConfig().getBrokerId() != MixAll.MASTER_ID) {
+ LOGGER.info("slave preOnline complete, start service");
+ long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
+ this.brokerController.startService(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+ }
+ return true;
+ }
+
+ private boolean prepareForMasterOnline(BrokerMemberGroup brokerMemberGroup) {
+ List<Long> brokerIdList = new ArrayList<>(brokerMemberGroup.getBrokerAddrs().keySet());
+ Collections.sort(brokerIdList);
+ while (true) {
+ if (waitBrokerIndex >= brokerIdList.size()) {
+ LOGGER.info("master preOnline complete, start service");
+ this.brokerController.startService(MixAll.MASTER_ID, this.brokerController.getBrokerAddr());
+ return true;
+ }
+
+ String brokerAddrToWait = brokerMemberGroup.getBrokerAddrs().get(brokerIdList.get(waitBrokerIndex));
+
+ try {
+ this.brokerController.getBrokerOuterAPI().
+ sendBrokerHaInfo(brokerAddrToWait, this.brokerController.getHAServerAddr(),
+ this.brokerController.getMessageStore().getBrokerInitMaxOffset(), this.brokerController.getBrokerAddr());
+ } catch (Exception e) {
+ LOGGER.error("send ha address to {} exception, {}", brokerAddrToWait, e);
+ return false;
+ }
+
+ CompletableFuture<Boolean> haHandshakeFuture = waitForHaHandshakeComplete(brokerAddrToWait)
+ .thenApply(result -> futureWaitAction(result, brokerMemberGroup));
+
+ try {
+ if (!haHandshakeFuture.get()) {
+ return false;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Wait handshake completion exception, {}", e);
+ return false;
+ }
+
+ if (syncMetadataReverse(brokerAddrToWait)) {
+ waitBrokerIndex++;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private boolean syncMetadataReverse(String brokerAddr) {
+ try {
+ LOGGER.info("Get metadata reverse from {}", brokerAddr);
+
+ String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(brokerAddr);
+ DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
+ DelayOffsetSerializeWrapper.fromJson(delayOffset, DelayOffsetSerializeWrapper.class);
+
+ ConsumerOffsetSerializeWrapper consumerOffsetSerializeWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(brokerAddr);
+
+ if (null != consumerOffsetSerializeWrapper && brokerController.getConsumerOffsetManager().getDataVersion().compare(consumerOffsetSerializeWrapper.getDataVersion()) <= 0) {
+ LOGGER.info("{}'s consumerOffset data version is larger than master broker, {}'s consumerOffset will be used.", brokerAddr, brokerAddr);
+ this.brokerController.getConsumerOffsetManager().getOffsetTable()
+ .putAll(consumerOffsetSerializeWrapper.getOffsetTable());
+ this.brokerController.getConsumerOffsetManager().getDataVersion().assignNewOne(consumerOffsetSerializeWrapper.getDataVersion());
+ this.brokerController.getConsumerOffsetManager().persist();
+ }
+
+ if (null != delayOffset && brokerController.getScheduleMessageService().getDataVersion().compare(delayOffsetSerializeWrapper.getDataVersion()) <= 0) {
+ LOGGER.info("{}'s scheduleMessageService data version is larger than master broker, {}'s delayOffset will be used.", brokerAddr, brokerAddr);
+ String fileName =
+ StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+ .getMessageStoreConfig().getStorePathRootDir());
+ try {
+ MixAll.string2File(delayOffset, fileName);
+ this.brokerController.getScheduleMessageService().load();
+ } catch (IOException e) {
+ LOGGER.error("Persist file Exception, {}", fileName, e);
+ }
+ }
+
+ for (BrokerAttachedPlugin brokerAttachedPlugin : brokerController.getBrokerAttachedPlugins()) {
+ if (brokerAttachedPlugin != null) {
+ brokerAttachedPlugin.syncMetadataReverse(brokerAddr);
+ }
+ }
+
+ } catch (Exception e) {
+ LOGGER.error("GetMetadataReverse Failed", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean prepareForSlaveOnline(BrokerMemberGroup brokerMemberGroup) {
+ BrokerSyncInfo brokerSyncInfo;
+ try {
+ brokerSyncInfo = this.brokerController.getBrokerOuterAPI()
+ .retrieveBrokerHaInfo(brokerMemberGroup.getBrokerAddrs().get(MixAll.MASTER_ID));
+ } catch (Exception e) {
+ LOGGER.error("retrieve master ha info exception, {}", e);
+ return false;
+ }
+
+ if (this.brokerController.getMessageStore().getMasterFlushedOffset() == 0
+ && this.brokerController.getMessageStoreConfig().isSyncMasterFlushOffsetWhenStartup()) {
+ LOGGER.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
+ this.brokerController.getMessageStore().setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
+ }
+
+ if (brokerSyncInfo.getMasterHaAddress() != null) {
+ this.brokerController.getMessageStore().updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
+ this.brokerController.getMessageStore().updateMasterAddress(brokerSyncInfo.getMasterAddress());
+ } else {
+ LOGGER.info("fetch master ha address return null, start service directly");
+ long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
+ this.brokerController.startService(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+ return true;
+ }
+
+ CompletableFuture<Boolean> haHandshakeFuture = waitForHaHandshakeComplete(brokerSyncInfo.getMasterHaAddress())
+ .thenApply(result -> futureWaitAction(result, brokerMemberGroup));
+
+ try {
+ if (!haHandshakeFuture.get()) {
+ return false;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Wait handshake completion exception, {}", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean prepareForBrokerOnline() {
+ BrokerMemberGroup brokerMemberGroup;
+ try {
+ brokerMemberGroup = this.brokerController.getBrokerOuterAPI().syncBrokerMemberGroup(
+ this.brokerController.getBrokerConfig().getBrokerClusterName(),
+ this.brokerController.getBrokerConfig().getBrokerName(),
+ this.brokerController.getBrokerContainer().getBrokerContainerConfig().isCompatibleWithOldNameSrv());
+ } catch (Exception e) {
+ LOGGER.error("syncBrokerMemberGroup from namesrv error, start service failed, will try later, ", e);
+ return false;
+ }
+
+ if (brokerMemberGroup != null && !brokerMemberGroup.getBrokerAddrs().isEmpty()) {
+ long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
+
+ if (this.brokerController.getBrokerConfig().getBrokerId() == MixAll.MASTER_ID) {
+ return prepareForMasterOnline(brokerMemberGroup);
+ } else if (minBrokerId == MixAll.MASTER_ID) {
+ return prepareForSlaveOnline(brokerMemberGroup);
+ } else {
+ LOGGER.info("no master online, start service directly");
+ this.brokerController.startService(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+ }
+ } else {
+ LOGGER.info("no other broker online, will start service directly");
+ this.brokerController.startService(this.brokerController.getBrokerConfig().getBrokerId(), this.brokerController.getBrokerAddr());
+ }
+
+ return true;
+ }
+
+ private long getMinBrokerId(Map<Long, String> brokerAddrMap) {
+ Map<Long, String> brokerAddrMapCopy = new HashMap<>(brokerAddrMap);
+ brokerAddrMapCopy.remove(this.brokerController.getBrokerConfig().getBrokerId());
+ if (!brokerAddrMapCopy.isEmpty()) {
+ return Collections.min(brokerAddrMapCopy.keySet());
+ }
+ return this.brokerController.getBrokerConfig().getBrokerId();
+ }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
new file mode 100644
index 0000000..dc9f463
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import io.netty.channel.Channel;
+import java.util.Collection;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+
+public class ContainerClientHouseKeepingService implements ChannelEventListener {
+ private final IBrokerContainer brokerContainer;
+
+ public ContainerClientHouseKeepingService(final IBrokerContainer brokerContainer) {
+ this.brokerContainer = brokerContainer;
+ }
+
+ @Override public void onChannelConnect(String remoteAddr, Channel channel) {
+ onChannelOperation(CallbackCode.CONNECT, remoteAddr, channel);
+ }
+
+ @Override public void onChannelClose(String remoteAddr, Channel channel) {
+ onChannelOperation(CallbackCode.CLOSE, remoteAddr, channel);
+ }
+
+ @Override public void onChannelException(String remoteAddr, Channel channel) {
+ onChannelOperation(CallbackCode.EXCEPTION, remoteAddr, channel);
+ }
+
+ @Override public void onChannelIdle(String remoteAddr, Channel channel) {
+ onChannelOperation(CallbackCode.IDLE, remoteAddr, channel);
+ }
+
+ private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) {
+ Collection<InnerBrokerController> masterBrokers = this.brokerContainer.getMasterBrokers();
+ Collection<InnerSalveBrokerController> slaveBrokers = this.brokerContainer.getSlaveBrokers();
+
+ for (BrokerController masterBroker : masterBrokers) {
+ brokerOperation(masterBroker, callbackCode, remoteAddr, channel);
+ }
+
+ for (InnerSalveBrokerController slaveBroker : slaveBrokers) {
+ brokerOperation(slaveBroker, callbackCode, remoteAddr, channel);
+ }
+ }
+
+ private void brokerOperation(BrokerController brokerController, CallbackCode callbackCode, String remoteAddr,
+ Channel channel) {
+ if (callbackCode == CallbackCode.CONNECT) {
+ brokerController.getBrokerStatsManager().incChannelConnectNum();
+ return;
+ }
+ boolean removed = brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+ removed &= brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+ if (removed) {
+ switch (callbackCode) {
+ case CLOSE:
+ brokerController.getBrokerStatsManager().incChannelCloseNum();
+ break;
+ case EXCEPTION:
+ brokerController.getBrokerStatsManager().incChannelExceptionNum();
+ break;
+ case IDLE:
+ brokerController.getBrokerStatsManager().incChannelIdleNum();
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ public enum CallbackCode {
+ /**
+ * onChannelConnect
+ */
+ CONNECT,
+ /**
+ * onChannelClose
+ */
+ CLOSE,
+ /**
+ * onChannelException
+ */
+ EXCEPTION,
+ /**
+ * onChannelIdle
+ */
+ IDLE
+ }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/IBrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/IBrokerContainer.java
new file mode 100644
index 0000000..d3cdc05
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/IBrokerContainer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+/**
+ * An interface for broker container to hold multiple master and slave brokers.
+ */
+public interface IBrokerContainer {
+
+ /**
+ * Start broker container
+ */
+ void start() throws Exception;
+
+ /**
+ * Shutdown broker container and all the brokers inside.
+ */
+ void shutdown();
+
+ /**
+ * Add a broker to this container with specific broker config.
+ *
+ * @param brokerConfig the specified broker config
+ * @param storeConfig the specified store config
+ * @return the added BrokerController or null if the broker already exists
+ * @throws Exception when initialize broker
+ */
+ BrokerController addBroker(BrokerConfig brokerConfig, MessageStoreConfig storeConfig) throws Exception;
+
+ /**
+ * Remove the broker from this container associated with the specific broker identity
+ *
+ * @param brokerIdentity the specific broker identity
+ * @return the removed BrokerController or null if the broker doesn't exists
+ */
+ BrokerController removeBroker(BrokerIdentity brokerIdentity) throws Exception;
+
+ /**
+ * Return the broker controller associated with the specific broker identity
+ *
+ * @param brokerIdentity the specific broker identity
+ * @return the associated messaging broker or null
+ */
+ BrokerController getBroker(BrokerIdentity brokerIdentity);
+
+ /**
+ * Return all the master brokers belong to this container
+ *
+ * @return the master broker list
+ */
+ Collection<InnerBrokerController> getMasterBrokers();
+
+ /**
+ * Return all the slave brokers belong to this container
+ *
+ * @return the slave broker list
+ */
+ Collection<InnerSalveBrokerController> getSlaveBrokers();
+
+ /**
+ * Return all broker controller in this container
+ *
+ * @return all broker controller
+ */
+ List<BrokerController> getBrokerControllers();
+
+ /**
+ * Return the address of broker container.
+ *
+ * @return broker container address.
+ */
+ String getBrokerContainerAddr();
+
+ /**
+ * Peek the first master broker in container.
+ *
+ * @return the first master broker in container
+ */
+ BrokerController peekMasterBroker();
+
+ /**
+ * Return the config of the broker container
+ *
+ * @return the broker container config
+ */
+ BrokerContainerConfig getBrokerContainerConfig();
+
+ /**
+ * Get netty server config.
+ *
+ * @return netty server config
+ */
+ NettyServerConfig getNettyServerConfig();
+
+ /**
+ * Get netty client config.
+ *
+ * @return netty client config
+ */
+ NettyClientConfig getNettyClientConfig();
+
+ /**
+ * Return the shared BrokerOuterAPI
+ *
+ * @return the shared BrokerOuterAPI
+ */
+ BrokerOuterAPI getBrokerOuterAPI();
+
+ /**
+ * Return the shared RemotingServer
+ *
+ * @return the shared RemotingServer
+ */
+ RemotingServer getRemotingServer();
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
new file mode 100644
index 0000000..4603164
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.container;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class InnerBrokerController extends BrokerController {
+ private ScheduledExecutorService syncBrokerMemberGroupExecutorService;
+ private ScheduledExecutorService brokerHeartbeatExecutorService;
+ protected volatile long minBrokerIdInGroup = 0;
+ protected volatile String minBrokerAddrInGroup = null;
+ protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
+ private BrokerPreOnlineService brokerPreOnlineService;
+ protected BrokerContainer brokerContainer;
+ protected volatile boolean isIsolated = false;
+
+ public InnerBrokerController(
+ final BrokerContainer brokerContainer,
+ final BrokerConfig brokerConfig,
+ final MessageStoreConfig messageStoreConfig
+ ) {
+ super(brokerConfig, messageStoreConfig);
+ this.brokerContainer = brokerContainer;
+ this.brokerOuterAPI = this.brokerContainer.getBrokerOuterAPI();
+
+ if (!this.brokerConfig.isSkipPreOnline()) {
+ this.brokerPreOnlineService = new BrokerPreOnlineService(this);
+ }
+ }
+
+ @Override
+ protected void initializeRemotingServer() {
+ this.remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
+ this.fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);
+ }
+
+ /**
+ * Initialize resources for master which will be re-used by slave.
+ */
+ @Override
+ protected void initializeResources() {
+ super.initializeResources();
+ this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", brokerConfig));
+ this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", brokerConfig));
+ }
+
+ @Override
+ protected void initializeScheduledTasks() {
+ initializeBrokerScheduledTasks();
+ }
+
+ @Override
+ public void shutdown() {
+
+ shutdownBasicService();
+
+ if (this.remotingServer != null) {
+ this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
+ }
+
+ if (this.fastRemotingServer != null) {
+ this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
+ }
+
+ for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+ scheduledFuture.cancel(true);
+ }
+
+ if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
+ this.brokerPreOnlineService.shutdown();
+ }
+
+ shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
+ shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
+
+ }
+
+ @Override
+ public String getBrokerAddr() {
+ return this.brokerConfig.getBrokerIP1() + ":" + this.brokerConfig.getListenPort();
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
+
+ if (messageStoreConfig.getTotalReplicas() > 1) {
+ isIsolated = true;
+ }
+
+ startBasicService();
+
+ if (this.brokerPreOnlineService != null) {
+ this.brokerPreOnlineService.start();
+ }
+
+ if (!isIsolated) {
+ this.registerBrokerAll(true, false, true);
+ }
+
+ scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerConfig) {
+ @Override
+ public void run2() {
+ try {
+ if (System.currentTimeMillis() < shouldStartTime) {
+ BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
+ return;
+ }
+ if (isIsolated) {
+ BrokerController.LOG.info("Skip register for broker is isolated");
+ return;
+ }
+ InnerBrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+ } catch (Throwable e) {
+ BrokerController.LOG.error("registerBrokerAll Exception", e);
+ }
+ }
+ }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
+
+ if (this.brokerConfig.isEnableSlaveActingMaster()) {
+ scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerConfig) {
+ @Override
+ public void run2() {
+ if (isIsolated) {
+ return;
+ }
+ try {
+ InnerBrokerController.this.sendHeartbeat();
+ } catch (Exception e) {
+ BrokerController.LOG.error("sendHeartbeat Exception", e);
+ }
+
+ }
+ }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
+
+ scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerConfig) {
+ @Override public void run2() {
+ try {
+ InnerBrokerController.this.syncBrokerMemberGroup();
+ } catch (Throwable e) {
+ BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
+ }
+ }
+ }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
+
+ }
+
+ if (!isIsolated && !messageStoreConfig.isEnableDLegerCommitLog()
+ && !messageStoreConfig.isDuplicationEnable() && !this.brokerConfig.isEnableSlaveActingMaster()) {
+ changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
+ }
+
+ if (brokerConfig.isSkipPreOnline()) {
+ startServiceWithoutCondition();
+ }
+ }
+
+ private void sendHeartbeat() {
+ if (this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv()) {
+ this.brokerOuterAPI.sendHeartbeatViaDataVersion(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.getTopicConfigManager().getDataVersion(),
+ this.brokerConfig.isInBrokerContainer());
+ } else {
+ this.brokerOuterAPI.sendHeartbeat(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.brokerConfig.isInBrokerContainer());
+ }
+ }
+
+ public void syncBrokerMemberGroup() {
+ try {
+ brokerMemberGroup = this.brokerContainer.getBrokerOuterAPI()
+ .syncBrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv());
+ } catch (Exception e) {
+ BrokerController.LOG.error("syncBrokerMemberGroup from namesrv failed, ", e);
+ return;
+ }
+ if (brokerMemberGroup == null || brokerMemberGroup.getBrokerAddrs().size() == 0) {
+ BrokerController.LOG.warn("Couldn't find any broker member from namesrv in {}/{}", this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
+ return;
+ }
+ this.messageStore.setAliveReplicaNumInGroup(calcAliveBrokerNumInGroup(brokerMemberGroup.getBrokerAddrs()));
+
+ if (!this.isIsolated) {
+ long minBrokerId = brokerMemberGroup.minimumBrokerId();
+ this.updateMinBroker(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+ }
+ }
+
+ private int calcAliveBrokerNumInGroup(Map<Long, String> brokerAddrTable) {
+ if (brokerAddrTable.containsKey(this.brokerConfig.getBrokerId())) {
+ return brokerAddrTable.size();
+ } else {
+ return brokerAddrTable.size() + 1;
+ }
+ }
+
+ @Override
+ protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+ TopicConfigSerializeWrapper topicConfigWrapper) {
+
+ if (shutdown) {
+ BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
+ return;
+ }
+ List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.getHAServerAddr(),
+ topicConfigWrapper,
+ this.filterServerManager.buildNewFilterServerList(),
+ oneway,
+ this.brokerConfig.getRegisterBrokerTimeoutMills(),
+ this.brokerConfig.isEnableSlaveActingMaster(),
+ this.brokerConfig.isCompressedRegister(),
+ this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
+ this.brokerConfig.isInBrokerContainer());
+
+ handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
+ }
+
+ @Override
+ public String getNameServerList() {
+ if (this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr() != null) {
+ this.brokerContainer.getBrokerOuterAPI().updateNameServerAddressList(brokerContainer.getBrokerContainerConfig().getNamesrvAddr());
+ return this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr();
+ } else if (this.brokerContainer.getBrokerContainerConfig().isFetchNamesrvAddrByAddressServer()) {
+ return this.brokerContainer.getBrokerOuterAPI().fetchNameServerAddr();
+ }
+ return null;
+ }
+
+ @Override
+ public String getHAServerAddr() {
+ return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
+ }
+
+ @Override
+ public long getMinBrokerIdInGroup() {
+ return this.minBrokerIdInGroup;
+ }
+
+ @Override
+ public boolean isSpecialServiceRunning() {
+ if (isScheduleServiceStart() && isTransactionCheckServiceStart()) {
+ return true;
+ }
+
+ return this.ackMessageProcessor != null && this.ackMessageProcessor.isPopReviveServiceRunning();
+ }
+
+ @Override
+ public int getListenPort() {
+ return this.brokerConfig.getListenPort();
+ }
+
+ public BrokerOuterAPI getBrokerOuterAPI() {
+ return brokerContainer.getBrokerOuterAPI();
+ }
+
+ public void startService(long minBrokerId, String minBrokerAddr) {
+ BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
+ this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
+ this.minBrokerIdInGroup = minBrokerId;
+ this.minBrokerAddrInGroup = minBrokerAddr;
+
+ this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == minBrokerId);
+ this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+ isIsolated = false;
+ }
+
+ public void startServiceWithoutCondition() {
+ BrokerController.LOG.info("{} start service", this.brokerConfig.getCanonicalName());
+
+ this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
+ this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+ isIsolated = false;
+ }
+
+ public void stopService() {
+ BrokerController.LOG.info("{} stop service", this.getBrokerConfig().getCanonicalName());
+ isIsolated = true;
+ this.changeSpecialServiceStatus(false);
+ this.closeChannels();
+ }
+
+ public synchronized void closeChannels() {
+ this.brokerContainer.getBrokerOuterAPI().getRemotingClient().closeChannels();
+ }
+
+ public BrokerContainer getBrokerContainer() {
+ return this.brokerContainer;
+ }
+
+ public boolean isIsolated() {
+ return this.isIsolated;
+ }
+
+ public NettyServerConfig getNettyServerConfig() {
+ return brokerContainer.getNettyServerConfig();
+ }
+
+ public NettyClientConfig getNettyClientConfig() {
+ return brokerContainer.getNettyClientConfig();
+ }
+
+ public MessageStore getMessageStoreByBrokerName(String brokerName) {
+ if (this.brokerConfig.getBrokerName().equals(brokerName)) {
+ return this.getMessageStore();
+ }
+ BrokerController brokerController = this.brokerContainer.findBrokerControllerByBrokerName(brokerName);
+ if (brokerController != null) {
+ return brokerController.getMessageStore();
+ }
+ return null;
+ }
+
+ @Override
+ public BrokerController peekMasterBroker() {
+ if (brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
+ return this;
+ }
+ return this.brokerContainer.peekMasterBroker();
+ }
+
+ public BrokerPreOnlineService getBrokerPreOnlineService() {
+ return brokerPreOnlineService;
+ }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
new file mode 100644
index 0000000..e00242a
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerSyncInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class InnerSalveBrokerController extends InnerBrokerController {
+
+ private final Lock lock = new ReentrantLock();
+
+ public InnerSalveBrokerController(final BrokerContainer brokerContainer,
+ final BrokerConfig brokerConfig,
+ final MessageStoreConfig storeConfig) {
+ super(brokerContainer, brokerConfig, storeConfig);
+ // Check configs
+ checkSlaveBrokerConfig();
+ }
+
+ private void checkSlaveBrokerConfig() {
+ Preconditions.checkNotNull(brokerConfig.getBrokerClusterName());
+ Preconditions.checkNotNull(brokerConfig.getBrokerName());
+ Preconditions.checkArgument(brokerConfig.getBrokerId() != MixAll.MASTER_ID);
+ }
+
+ private void onMasterOffline() {
+ // close channels with master broker
+ String masterAddr = this.slaveSynchronize.getMasterAddr();
+ if (masterAddr != null) {
+ this.brokerOuterAPI.getRemotingClient().closeChannels(
+ Arrays.asList(masterAddr, MixAll.brokerVIPChannel(true, masterAddr)));
+ }
+ // master not available, stop sync
+ this.slaveSynchronize.setMasterAddr(null);
+ this.messageStore.updateHaMasterAddress(null);
+ }
+
+ private void onMasterOnline(String masterAddr, String masterHaAddr) {
+ boolean needSyncMasterFlushOffset = this.messageStore.getMasterFlushedOffset() == 0
+ && this.messageStoreConfig.isSyncMasterFlushOffsetWhenStartup();
+ if (masterHaAddr == null || needSyncMasterFlushOffset) {
+ try {
+ BrokerSyncInfo brokerSyncInfo = this.brokerOuterAPI.retrieveBrokerHaInfo(masterAddr);
+
+ if (needSyncMasterFlushOffset) {
+ LOG.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
+ this.messageStore.setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
+ }
+
+ if (masterHaAddr == null) {
+ this.messageStore.updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
+ this.messageStore.updateMasterAddress(brokerSyncInfo.getMasterAddress());
+ }
+ } catch (Exception e) {
+ LOG.error("retrieve master ha info exception, {}", e);
+ }
+ }
+
+ // set master HA address.
+ if (masterHaAddr != null) {
+ this.messageStore.updateHaMasterAddress(masterHaAddr);
+ }
+
+ // wakeup HAClient
+ this.messageStore.wakeupHAClient();
+ }
+
+ private void onMinBrokerChange(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+ String masterHaAddr) {
+ LOG.info("Min broker changed, old: {}-{}, new {}-{}",
+ this.minBrokerIdInGroup, this.minBrokerAddrInGroup, minBrokerId, minBrokerAddr);
+
+ this.minBrokerIdInGroup = minBrokerId;
+ this.minBrokerAddrInGroup = minBrokerAddr;
+
+ this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == this.minBrokerIdInGroup);
+
+ if (offlineBrokerAddr != null && offlineBrokerAddr.equals(this.slaveSynchronize.getMasterAddr())) {
+ // master offline
+ onMasterOffline();
+ }
+
+ if (minBrokerId == MixAll.MASTER_ID && minBrokerAddr != null) {
+ // master online
+ onMasterOnline(minBrokerAddr, masterHaAddr);
+ }
+
+ // notify PullRequest on hold to pull from master.
+ if (this.minBrokerIdInGroup == MixAll.MASTER_ID) {
+ this.pullRequestHoldService.notifyMasterOnline();
+ }
+ }
+
+ @Override
+ public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
+ if (lock.tryLock()) {
+ try {
+ if (minBrokerId != this.minBrokerIdInGroup) {
+ String offlineBrokerAddr = null;
+ if (minBrokerId > this.minBrokerIdInGroup) {
+ offlineBrokerAddr = this.minBrokerAddrInGroup;
+ }
+ onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, null);
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ }
+ }
+
+ @Override
+ public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+ String masterHaAddr) {
+ try {
+ if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+ try {
+ if (minBrokerId != this.minBrokerIdInGroup) {
+ onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, masterHaAddr);
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Update min broker error, {}", e);
+ }
+ }
+
+ @Override
+ public BrokerController peekMasterBroker() {
+ return this.brokerContainer.peekMasterBroker();
+ }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
new file mode 100644
index 0000000..d4b4a75
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container.logback;
+
+import ch.qos.logback.classic.AsyncAppender;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.encoder.Encoder;
+import ch.qos.logback.core.rolling.FixedWindowRollingPolicy;
+import ch.qos.logback.core.rolling.RollingFileAppender;
+import ch.qos.logback.core.rolling.RollingPolicy;
+import ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP;
+import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
+import ch.qos.logback.core.rolling.TimeBasedFileNamingAndTriggeringPolicy;
+import ch.qos.logback.core.rolling.TimeBasedRollingPolicy;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Set;
+
+import ch.qos.logback.core.util.FileSize;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.slf4j.LoggerFactory;
+
+public class BrokerLogbackConfigurator {
+ private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private static final Set<String> CONFIGURED_BROKER_LIST = new HashSet<>();
+
+ public static final String ROCKETMQ_LOGS = "rocketmqlogs";
+ public static final String ROCKETMQ_PREFIX = "Rocketmq";
+ public static final String SUFFIX_CONSOLE = "Console";
+ public static final String SUFFIX_APPENDER = "Appender";
+ public static final String SUFFIX_INNER_APPENDER = "_inner";
+
+ public static void doConfigure(BrokerConfig brokerConfig) {
+ if (!CONFIGURED_BROKER_LIST.contains(brokerConfig.getCanonicalName())) {
+ try {
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ for (ch.qos.logback.classic.Logger tempLogger : lc.getLoggerList()) {
+ String loggerName = tempLogger.getName();
+ if (loggerName.startsWith(ROCKETMQ_PREFIX)
+ && !loggerName.endsWith(SUFFIX_CONSOLE)
+ && !loggerName.equals(LoggerName.ACCOUNT_LOGGER_NAME)
+ && !loggerName.equals(LoggerName.COMMERCIAL_LOGGER_NAME)
+ && !loggerName.equals(LoggerName.CONSUMER_STATS_LOGGER_NAME)) {
+ ch.qos.logback.classic.Logger logger = lc.getLogger(brokerConfig.getLoggerIdentifier() + loggerName);
+ logger.setAdditive(tempLogger.isAdditive());
+ logger.setLevel(tempLogger.getLevel());
+ String appenderName = loggerName + SUFFIX_APPENDER;
+ Appender<ILoggingEvent> tempAppender = tempLogger.getAppender(appenderName);
+ if (tempAppender instanceof AsyncAppender) {
+ AsyncAppender tempAsyncAppender = (AsyncAppender) tempAppender;
+ AsyncAppender asyncAppender = new AsyncAppender();
+ asyncAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+ asyncAppender.setContext(tempAsyncAppender.getContext());
+
+ String innerAppenderName = appenderName + SUFFIX_INNER_APPENDER;
+ Appender<ILoggingEvent> tempInnerAppender = tempAsyncAppender.getAppender(innerAppenderName);
+ if (!(tempInnerAppender instanceof RollingFileAppender)) {
+ continue;
+ }
+ asyncAppender.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempInnerAppender,
+ brokerConfig, innerAppenderName));
+ asyncAppender.start();
+ logger.addAppender(asyncAppender);
+ } else if (tempAppender instanceof RollingFileAppender) {
+ logger.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempAppender,
+ brokerConfig, appenderName));
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerConfig.getCanonicalName(), e);
+ return;
+ }
+
+ CONFIGURED_BROKER_LIST.add(brokerConfig.getCanonicalName());
+ }
+ }
+
+ private static RollingFileAppender<ILoggingEvent> configureRollingFileAppender(
+ RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerConfig brokerConfig, String appenderName)
+ throws NoSuchFieldException, IllegalAccessException {
+ RollingFileAppender<ILoggingEvent> rollingFileAppender = new RollingFileAppender<>();
+
+ // configure appender name
+ rollingFileAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+
+ // configure file name
+ rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+
+ // configure append
+ rollingFileAppender.setAppend(true);
+
+ // configure prudent
+ rollingFileAppender.setPrudent(tempRollingFileAppender.isPrudent());
+
+ // configure rollingPolicy
+ RollingPolicy originalRollingPolicy = tempRollingFileAppender.getRollingPolicy();
+ if (originalRollingPolicy instanceof TimeBasedRollingPolicy) {
+ TimeBasedRollingPolicy<ILoggingEvent> tempRollingPolicy = (TimeBasedRollingPolicy<ILoggingEvent>) originalRollingPolicy;
+ TimeBasedRollingPolicy<ILoggingEvent> rollingPolicy = new TimeBasedRollingPolicy<>();
+ rollingPolicy.setContext(tempRollingPolicy.getContext());
+ rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern());
+ SizeAndTimeBasedFNATP<ILoggingEvent> sizeAndTimeBasedFNATP = new SizeAndTimeBasedFNATP<>();
+ sizeAndTimeBasedFNATP.setContext(tempRollingPolicy.getContext());
+ TimeBasedFileNamingAndTriggeringPolicy<ILoggingEvent> timeBasedFileNamingAndTriggeringPolicy =
+ tempRollingPolicy.getTimeBasedFileNamingAndTriggeringPolicy();
+ if (timeBasedFileNamingAndTriggeringPolicy instanceof SizeAndTimeBasedFNATP) {
+ SizeAndTimeBasedFNATP<ILoggingEvent> originalSizeAndTimeBasedFNATP =
+ (SizeAndTimeBasedFNATP<ILoggingEvent>) timeBasedFileNamingAndTriggeringPolicy;
+ Field field = originalSizeAndTimeBasedFNATP.getClass().getDeclaredField("maxFileSize");
+ field.setAccessible(true);
+ sizeAndTimeBasedFNATP.setMaxFileSize((FileSize) field.get(originalSizeAndTimeBasedFNATP));
+ sizeAndTimeBasedFNATP.setTimeBasedRollingPolicy(rollingPolicy);
+ }
+ rollingPolicy.setTimeBasedFileNamingAndTriggeringPolicy(sizeAndTimeBasedFNATP);
+ rollingPolicy.setMaxHistory(tempRollingPolicy.getMaxHistory());
+ rollingPolicy.setParent(rollingFileAppender);
+ rollingPolicy.start();
+ rollingFileAppender.setRollingPolicy(rollingPolicy);
+ } else if (originalRollingPolicy instanceof FixedWindowRollingPolicy) {
+ FixedWindowRollingPolicy tempRollingPolicy = (FixedWindowRollingPolicy) originalRollingPolicy;
+ FixedWindowRollingPolicy rollingPolicy = new FixedWindowRollingPolicy();
+ rollingPolicy.setContext(tempRollingPolicy.getContext());
+ rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+ rollingPolicy.setMaxIndex(tempRollingPolicy.getMaxIndex());
+ rollingPolicy.setMinIndex(tempRollingPolicy.getMinIndex());
+ rollingPolicy.setParent(rollingFileAppender);
+ rollingPolicy.start();
+ rollingFileAppender.setRollingPolicy(rollingPolicy);
+ }
+
+ // configure triggerPolicy
+ if (tempRollingFileAppender.getTriggeringPolicy() instanceof SizeBasedTriggeringPolicy) {
+ SizeBasedTriggeringPolicy<ILoggingEvent> tempTriggerPolicy = (SizeBasedTriggeringPolicy<ILoggingEvent>) tempRollingFileAppender.getTriggeringPolicy();
+ SizeBasedTriggeringPolicy<ILoggingEvent> triggerPolicy = new SizeBasedTriggeringPolicy<>();
+ triggerPolicy.setContext(tempTriggerPolicy.getContext());
+ Field field = triggerPolicy.getClass().getDeclaredField("maxFileSize");
+ field.setAccessible(true);
+ triggerPolicy.setMaxFileSize((FileSize) field.get(triggerPolicy));
+ triggerPolicy.start();
+ rollingFileAppender.setTriggeringPolicy(triggerPolicy);
+ }
+
+ // configure encoder
+ Encoder<ILoggingEvent> tempEncoder = tempRollingFileAppender.getEncoder();
+ if (tempEncoder instanceof PatternLayoutEncoder) {
+ PatternLayoutEncoder tempPatternLayoutEncoder = (PatternLayoutEncoder) tempEncoder;
+ PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
+ patternLayoutEncoder.setContext(tempPatternLayoutEncoder.getContext());
+ patternLayoutEncoder.setPattern(tempPatternLayoutEncoder.getPattern());
+ patternLayoutEncoder.setCharset(tempPatternLayoutEncoder.getCharset());
+ patternLayoutEncoder.start();
+
+ rollingFileAppender.setEncoder(patternLayoutEncoder);
+ }
+
+ // configure context
+ rollingFileAppender.setContext(tempRollingFileAppender.getContext());
+
+ rollingFileAppender.start();
+
+ return rollingFileAppender;
+ }
+}
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerStartupTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerStartupTest.java
new file mode 100644
index 0000000..8138098
--- /dev/null
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerStartupTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.assertj.core.util.Arrays;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerContainerStartupTest {
+ private static final List<File> TMP_FILE_LIST = new ArrayList<>();
+ private static final String BROKER_NAME_PREFIX = "TestBroker";
+ private static final String SHARED_BROKER_NAME_PREFIX = "TestBrokerContainer";
+ private static String brokerConfigPath;
+ private static String brokerContainerConfigPath;
+
+ @Mock
+ private BrokerConfig brokerConfig;
+ private String storePathRootDir = "store/test";
+ @Mock
+ private NettyClientConfig nettyClientConfig;
+ @Mock
+ private NettyServerConfig nettyServerConfig;
+
+ @Before
+ public void init() throws IOException {
+ String brokerName = BROKER_NAME_PREFIX + "_" + System.currentTimeMillis();
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setBrokerName(brokerName);
+ if (brokerConfig.getRocketmqHome() == null) {
+ brokerConfig.setRocketmqHome("../distribution");
+ }
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ String baseDir = createBaseDir(brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()).getAbsolutePath();
+ storeConfig.setStorePathRootDir(baseDir);
+ storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+
+ brokerConfigPath = "/tmp/" + brokerName;
+ brokerConfig.setBrokerConfigPath(brokerConfigPath);
+ File file = new File(brokerConfigPath);
+ TMP_FILE_LIST.add(file);
+ Properties brokerConfigProp = MixAll.object2Properties(brokerConfig);
+ Properties storeConfigProp = MixAll.object2Properties(storeConfig);
+
+ for (Object key : storeConfigProp.keySet()) {
+ brokerConfigProp.put(key, storeConfigProp.get(key));
+ }
+ MixAll.string2File(MixAll.properties2String(brokerConfigProp), brokerConfigPath);
+
+ brokerContainerConfigPath = "/tmp/" + SHARED_BROKER_NAME_PREFIX + System.currentTimeMillis();
+ BrokerContainerConfig brokerContainerConfig = new BrokerContainerConfig();
+ brokerContainerConfig.setBrokerConfigPaths(brokerConfigPath);
+ if (brokerContainerConfig.getRocketmqHome() == null) {
+ brokerContainerConfig.setRocketmqHome("../distribution");
+ }
+ File file1 = new File(brokerContainerConfigPath);
+ TMP_FILE_LIST.add(file1);
+ Properties brokerContainerConfigProp = MixAll.object2Properties(brokerContainerConfig);
+ MixAll.string2File(MixAll.properties2String(brokerContainerConfigProp), brokerContainerConfigPath);
+ }
+
+ @After
+ public void destory() {
+ for (File file : TMP_FILE_LIST) {
+ UtilAll.deleteFile(file);
+ }
+ }
+
+ @Test
+ public void testStartBroker1() {
+ BrokerContainer brokerContainer = BrokerContainerStartup.startBrokerContainer(
+ BrokerContainerStartup.createBrokerContainer(Arrays.array("-c", brokerContainerConfigPath)));
+ assertThat(brokerContainer).isNotNull();
+ List<BrokerController> brokers = BrokerContainerStartup.createAndStartBrokers(brokerContainer);
+ assertThat(brokers.size()).isEqualTo(1);
+
+ brokerContainer.shutdown();
+ assertThat(brokerContainer.getBrokerControllers().size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testStartBroker2() {
+ InnerBrokerController brokerController = (InnerBrokerController) BrokerContainerStartup.start(BrokerContainerStartup.createBrokerController(Arrays.array("-c", brokerConfigPath)));
+ assertThat(brokerController).isNotNull();
+
+ assertThat(brokerController.getBrokerContainer().getBrokerControllers().size()).isEqualTo(1);
+
+ brokerController.getBrokerContainer().shutdown();
+ assertThat(brokerController.getBrokerContainer().getBrokerControllers().size()).isEqualTo(0);
+ }
+
+ private static File createBaseDir(String prefix) {
+ final File file;
+ try {
+ file = Files.createTempDirectory(prefix).toFile();
+ TMP_FILE_LIST.add(file);
+ System.out.printf("create file at %s%n", file.getAbsolutePath());
+ return file;
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't create tmp folder", e);
+ }
+ }
+
+ @Before
+ public void clear() {
+ UtilAll.deleteFile(new File(storePathRootDir));
+ }
+
+ @After
+ public void tearDown() {
+ File configFile = new File(storePathRootDir);
+ UtilAll.deleteFile(configFile);
+ UtilAll.deleteEmptyDirectory(configFile);
+ UtilAll.deleteEmptyDirectory(configFile.getParentFile());
+ }
+}
\ No newline at end of file
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
new file mode 100644
index 0000000..42c934d
--- /dev/null
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+import static org.mockito.Mockito.*;
+
+public class BrokerContainerTest {
+ private static final List<File> TMP_FILE_LIST = new ArrayList<>();
+ private static final Random RANDOM = new Random();
+ private static final Set<Integer> PORTS_IN_USE = new HashSet<>();
+
+ /**
+ * Tests if the controller can be properly stopped and started.
+ *
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testBrokerContainerRestart() throws Exception {
+ BrokerContainer brokerController = new BrokerContainer(
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
+ assertThat(brokerController.initialize()).isTrue();
+ brokerController.start();
+ brokerController.shutdown();
+ }
+
+ @Test
+ public void testRegisterIncrementBrokerData() throws Exception {
+ BrokerController brokerController = new BrokerController(
+ new BrokerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig(),
+ new MessageStoreConfig());
+
+ BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
+ Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
+ field.setAccessible(true);
+ field.set(brokerController, brokerOuterAPI);
+
+ // topic-0 doesn't have queueGroupConfig.
+ // topic-1 has queueGroupConfig.
+ List<TopicConfig> topicConfigList = new ArrayList<>(2);
+ for (int i = 0; i < 2; i++) {
+ topicConfigList.add(new TopicConfig("topic-" + i));
+ }
+ DataVersion dataVersion = new DataVersion();
+
+ // Check normal condition.
+ testRegisterIncrementBrokerDataWithPerm(brokerController, brokerOuterAPI,
+ topicConfigList, dataVersion, PermName.PERM_READ | PermName.PERM_WRITE, 1);
+ // Check unwritable broker.
+ testRegisterIncrementBrokerDataWithPerm(brokerController, brokerOuterAPI,
+ topicConfigList, dataVersion, PermName.PERM_READ, 2);
+ // Check unreadable broker.
+ testRegisterIncrementBrokerDataWithPerm(brokerController, brokerOuterAPI,
+ topicConfigList, dataVersion, PermName.PERM_WRITE, 3);
+ }
+
+ @Test
+ public void testRegisterIncrementBrokerDataPerm() throws Exception {
+ BrokerController brokerController = new BrokerController(
+ new BrokerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig(),
+ new MessageStoreConfig());
+
+ BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
+ Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
+ field.setAccessible(true);
+ field.set(brokerController, brokerOuterAPI);
+
+ // topic-0 doesn't have queueGroupConfig.
+ // topic-1 has queueGroupConfig.
+ List<TopicConfig> topicConfigList = new ArrayList<>(2);
+ for (int i = 0; i < 2; i++) {
+ topicConfigList.add(new TopicConfig("topic-" + i));
+ }
+ DataVersion dataVersion = new DataVersion();
+
+ brokerController.getBrokerConfig().setBrokerPermission(4);
+
+ brokerController.registerIncrementBrokerData(topicConfigList, dataVersion);
+ // Get topicConfigSerializeWrapper created by registerIncrementBrokerData() from brokerOuterAPI.registerBrokerAll()
+ ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
+ verify(brokerOuterAPI).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(), anyString(),
+ captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyBoolean());
+ TopicConfigSerializeWrapper wrapper = captor.getValue();
+ for (Map.Entry<String, TopicConfig> entry : wrapper.getTopicConfigTable().entrySet()) {
+ assertThat(entry.getValue().getPerm()).isEqualTo(brokerController.getBrokerConfig().getBrokerPermission());
+ }
+
+ }
+
+ @Test
+ public void testMasterScaleOut() throws Exception {
+ BrokerContainer brokerContainer = new BrokerContainer(
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
+ assertThat(brokerContainer.initialize()).isTrue();
+ brokerContainer.getBrokerContainerConfig().setNamesrvAddr("127.0.0.1:9876");
+ brokerContainer.start();
+
+ BrokerConfig masterBrokerConfig = new BrokerConfig();
+
+ String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(baseDir);
+ messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ InnerBrokerController brokerController = brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+ assertThat(brokerController.isIsolated()).isFalse();
+
+ brokerContainer.shutdown();
+ brokerController.getMessageStore().destroy();
+ }
+
+ @Test
+ public void testAddMasterFailed() throws Exception {
+ BrokerContainer brokerContainer = new BrokerContainer(
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
+ assertThat(brokerContainer.initialize()).isTrue();
+ brokerContainer.start();
+
+ BrokerConfig masterBrokerConfig = new BrokerConfig();
+ masterBrokerConfig.setListenPort(brokerContainer.getNettyServerConfig().getListenPort());
+ boolean exceptionCaught = false;
+ try {
+ String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(baseDir);
+ messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+ } catch (Exception e) {
+ exceptionCaught = true;
+ } finally {
+ brokerContainer.shutdown();
+
+ }
+
+ assertThat(exceptionCaught).isTrue();
+ }
+
+ @Test
+ public void testAddSlaveFailed() throws Exception {
+ BrokerContainer sharedBrokerController = new BrokerContainer(
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
+ assertThat(sharedBrokerController.initialize()).isTrue();
+ sharedBrokerController.start();
+
+ BrokerConfig slaveBrokerConfig = new BrokerConfig();
+ slaveBrokerConfig.setBrokerId(1);
+ slaveBrokerConfig.setListenPort(sharedBrokerController.getNettyServerConfig().getListenPort());
+ MessageStoreConfig slaveMessageStoreConfig = new MessageStoreConfig();
+ slaveMessageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
+ String baseDir = createBaseDir("unnittest-slave").getAbsolutePath();
+ slaveMessageStoreConfig.setStorePathRootDir(baseDir);
+ slaveMessageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ boolean exceptionCaught = false;
+ try {
+ sharedBrokerController.addBroker(slaveBrokerConfig, slaveMessageStoreConfig);
+ } catch (Exception e) {
+ exceptionCaught = true;
+ } finally {
+ sharedBrokerController.shutdown();
+ }
+
+ assertThat(exceptionCaught).isTrue();
+ }
+
+ @Test
+ public void testAddAndRemoveMaster() throws Exception {
+ BrokerContainer brokerContainer = new BrokerContainer(
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
+ assertThat(brokerContainer.initialize()).isTrue();
+ brokerContainer.start();
+
+ BrokerConfig masterBrokerConfig = new BrokerConfig();
+ String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(baseDir);
+ messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ InnerBrokerController master = brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+ assertThat(master).isNotNull();
+ master.start();
+ assertThat(master.isIsolated()).isFalse();
+
+ brokerContainer.removeBroker(new BrokerIdentity(masterBrokerConfig.getBrokerClusterName(), masterBrokerConfig.getBrokerName(), masterBrokerConfig.getBrokerId()));
+ assertThat(brokerContainer.getMasterBrokers().size()).isEqualTo(0);
+
+ brokerContainer.shutdown();
+ master.getMessageStore().destroy();
+ }
+
+ @Test
+ public void testAddAndRemoveSlaveSuccess() throws Exception {
+ BrokerContainer brokerContainer = new BrokerContainer(
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
+ assertThat(brokerContainer.initialize()).isTrue();
+ brokerContainer.start();
+
+ BrokerConfig masterBrokerConfig = new BrokerConfig();
+ String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(baseDir);
+ messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ InnerBrokerController master = brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+ assertThat(master).isNotNull();
+ master.start();
+ assertThat(master.isIsolated()).isFalse();
+
+ BrokerConfig slaveBrokerConfig = new BrokerConfig();
+ slaveBrokerConfig.setListenPort(generatePort(masterBrokerConfig.getListenPort(), 10000));
+ slaveBrokerConfig.setBrokerId(1);
+ MessageStoreConfig slaveMessageStoreConfig = new MessageStoreConfig();
+ slaveMessageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
+ slaveMessageStoreConfig.setHaListenPort(generatePort(messageStoreConfig.getHaListenPort(), 10000));
+ baseDir = createBaseDir("unnittest-slave").getAbsolutePath();
+ slaveMessageStoreConfig.setStorePathRootDir(baseDir);
+ slaveMessageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ InnerBrokerController slave = brokerContainer.addBroker(slaveBrokerConfig, slaveMessageStoreConfig);
+ assertThat(slave).isNotNull();
+ slave.start();
+ assertThat(slave.isIsolated()).isFalse();
+
+ brokerContainer.removeBroker(new BrokerIdentity(slaveBrokerConfig.getBrokerClusterName(), slaveBrokerConfig.getBrokerName(), slaveBrokerConfig.getBrokerId()));
+ assertThat(brokerContainer.getSlaveBrokers().size()).isEqualTo(0);
+
+ brokerContainer.removeBroker(new BrokerIdentity(masterBrokerConfig.getBrokerClusterName(), masterBrokerConfig.getBrokerName(), masterBrokerConfig.getBrokerId()));
+ assertThat(brokerContainer.getMasterBrokers().size()).isEqualTo(0);
+
+ brokerContainer.shutdown();
+ slave.getMessageStore().destroy();
+ master.getMessageStore().destroy();
+ }
+
+ private static File createBaseDir(String prefix) {
+ final File file;
+ try {
+ file = Files.createTempDirectory(prefix).toFile();
+ TMP_FILE_LIST.add(file);
+ return file;
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't create tmp folder", e);
+ }
+ }
+
+ public static int generatePort(int base, int range) {
+ int result = base + RANDOM.nextInt(range);
+ while (PORTS_IN_USE.contains(result) || PORTS_IN_USE.contains(result - 2)) {
+ result = base + RANDOM.nextInt(range);
+ }
+ PORTS_IN_USE.add(result);
+ PORTS_IN_USE.add(result - 2);
+ return result;
+ }
+
+ @After
+ public void destory() {
+ for (File file : TMP_FILE_LIST) {
+ UtilAll.deleteFile(file);
+ }
+ }
+
+ private void testRegisterIncrementBrokerDataWithPerm(BrokerController brokerController,
+ BrokerOuterAPI brokerOuterAPI,
+ List<TopicConfig> topicConfigList, DataVersion dataVersion, int perm, int times) {
+ brokerController.getBrokerConfig().setBrokerPermission(perm);
+
+ brokerController.registerIncrementBrokerData(topicConfigList, dataVersion);
+ // Get topicConfigSerializeWrapper created by registerIncrementBrokerData() from brokerOuterAPI.registerBrokerAll()
+ ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
+ verify(brokerOuterAPI, times(times)).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(),
+ anyString(), captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyBoolean());
+ TopicConfigSerializeWrapper wrapper = captor.getValue();
+
+ for (TopicConfig topicConfig : topicConfigList) {
+ topicConfig.setPerm(perm);
+ }
+ assertThat(wrapper.getDataVersion()).isEqualTo(dataVersion);
+ assertThat(wrapper.getTopicConfigTable()).containsExactly(
+ entry("topic-0", topicConfigList.get(0)),
+ entry("topic-1", topicConfigList.get(1)));
+ for (TopicConfig topicConfig : topicConfigList) {
+ topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+ }
+ }
+}
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
new file mode 100644
index 0000000..6943d28
--- /dev/null
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerPreOnlineTest {
+ @Mock
+ private BrokerContainer brokerContainer;
+
+ private InnerBrokerController innerBrokerController;
+
+ @Mock
+ private BrokerOuterAPI brokerOuterAPI;
+
+ public void init() throws Exception {
+ when(brokerContainer.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+ when(brokerContainer.getBrokerContainerConfig()).thenReturn(new BrokerContainerConfig());
+
+ BrokerMemberGroup brokerMemberGroup1 = new BrokerMemberGroup();
+ Map<Long, String> brokerAddrMap = new HashMap<>();
+ brokerAddrMap.put(1L, "127.0.0.1:20911");
+ brokerMemberGroup1.setBrokerAddrs(brokerAddrMap);
+
+ BrokerMemberGroup brokerMemberGroup2 = new BrokerMemberGroup();
+ brokerMemberGroup2.setBrokerAddrs(new HashMap<>());
+
+// when(brokerOuterAPI.syncBrokerMemberGroup(anyString(), anyString()))
+// .thenReturn(brokerMemberGroup1)
+// .thenReturn(brokerMemberGroup2);
+// doNothing().when(brokerOuterAPI).sendBrokerHaInfo(anyString(), anyString(), anyLong(), anyString());
+
+ DefaultMessageStore defaultMessageStore = mock(DefaultMessageStore.class);
+ when(defaultMessageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
+ when(defaultMessageStore.getBrokerConfig()).thenReturn(new BrokerConfig());
+
+// HAService haService = new DefaultHAService();
+// haService.init(defaultMessageStore);
+// haService.start();
+//
+// when(defaultMessageStore.getHaService()).thenReturn(haService);
+
+ innerBrokerController = new InnerBrokerController(brokerContainer,
+ defaultMessageStore.getBrokerConfig(),
+ defaultMessageStore.getMessageStoreConfig());
+
+ innerBrokerController.setTransactionalMessageCheckService(new TransactionalMessageCheckService(innerBrokerController));
+
+ Field field = InnerBrokerController.class.getDeclaredField("isIsolated");
+ field.setAccessible(true);
+ field.set(innerBrokerController, true);
+
+ field = BrokerController.class.getDeclaredField("messageStore");
+ field.setAccessible(true);
+ field.set(innerBrokerController, defaultMessageStore);
+ }
+
+ @Test
+ public void testMasterOnlineConnTimeout() throws Exception {
+ init();
+ BrokerPreOnlineService brokerPreOnlineService = new BrokerPreOnlineService(innerBrokerController);
+
+ brokerPreOnlineService.start();
+
+ await().atMost(Duration.ofSeconds(30)).until(() -> !innerBrokerController.isIsolated());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 880df2d..5f6c4ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,7 @@
<module>logging</module>
<module>acl</module>
<module>example</module>
+ <module>container</module>
</modules>
<build>
@@ -460,6 +461,11 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-container</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${project.version}</version>
</dependency>
@@ -531,7 +537,7 @@
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
- <version>1.2</version>
+ <version>1.4</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
@@ -566,6 +572,11 @@
<version>3.4</version>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.6</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
@@ -610,8 +621,11 @@
<artifactId>commons-validator</artifactId>
<version>1.7</version>
</dependency>
-
-
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.1.11</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
index 6292fc0..0f17858 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
@@ -23,5 +23,7 @@ public interface RPCHook {
void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
void doAfterResponse(final String remoteAddr, final RemotingCommand request,
- final RemotingCommand response);
+ final RemotingCommand response);
+
+ void doAfterRpcFailure(final String remoteAddr, RemotingCommand request, Boolean remoteTimeout);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index c0754db..9f6933d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -17,12 +17,14 @@
package org.apache.rocketmq.remoting;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
@@ -31,6 +33,8 @@ public interface RemotingClient extends RemotingService {
List<String> getNameServerAddressList();
+ List<String> getAvailableNameSrvList();
+
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
@@ -48,7 +52,12 @@ public interface RemotingClient extends RemotingService {
void setCallbackExecutor(final ExecutorService callbackExecutor);
- ExecutorService getCallbackExecutor();
-
boolean isChannelWritable(final String addr);
+
+ void closeChannels();
+
+ void closeChannels(final List<String> addrList);
+
+ ConcurrentMap<Integer, ResponseFuture> getResponseTable();
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index a12c089..36e2035 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -36,6 +36,12 @@ public interface RemotingServer extends RemotingService {
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
+ Pair<NettyRequestProcessor, ExecutorService> getDefaultProcessorPair();
+
+ RemotingServer newRemotingServer(int port);
+
+ void removeRemotingServer(int port);
+
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
index 2f88797..c718f2e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
@@ -23,4 +23,9 @@ public interface RemotingService {
void shutdown();
void registerRPCHook(RPCHook rpcHook);
+
+ /**
+ * Remove all rpc hooks.
+ */
+ void clearRPCHook();
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 4654e49..4c8a62a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -189,6 +189,19 @@ public class RemotingHelper {
return "";
}
+ public static String parseHostFromAddress(String address) {
+ if (address == null) {
+ return "";
+ }
+
+ String[] addressSplits = address.split(":");
+ if (addressSplits.length < 1) {
+ return "";
+ }
+
+ return addressSplits[0];
+ }
+
public static String parseSocketAddressAddr(SocketAddress socketAddress) {
if (socketAddress != null) {
// Default toString of InetSocketAddress is "hostName/IP:port"
@@ -199,4 +212,29 @@ public class RemotingHelper {
return "";
}
+ public static int parseSocketAddressPort(SocketAddress socketAddress) {
+ if (socketAddress instanceof InetSocketAddress) {
+ return ((InetSocketAddress) socketAddress).getPort();
+ }
+ return -1;
+ }
+
+
+ public static int ipToInt(String ip) {
+ String[] ips = ip.split("\\.");
+ return (Integer.parseInt(ips[0]) << 24)
+ | (Integer.parseInt(ips[1]) << 16)
+ | (Integer.parseInt(ips[2]) << 8)
+ | Integer.parseInt(ips[3]);
+ }
+
+ public static boolean ipInCIDR(String ip, String cidr) {
+ int ipAddr = ipToInt(ip);
+ String[] cidrArr = cidr.split("/");
+ int netId = Integer.parseInt(cidrArr[1]);
+ int mask = 0xFFFFFFFF << (32 - netId);
+ int cidrIpAddr = ipToInt(cidrArr[0]);
+
+ return (ipAddr & mask) == (cidrIpAddr & mask);
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index d5ce20b..53301e8 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -218,13 +218,17 @@ public class RemotingUtil {
public static void closeChannel(Channel channel) {
final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);
- channel.close().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
- future.isSuccess());
- }
- });
+ if (addrRemote == "") {
+ channel.close();
+ } else {
+ channel.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
+ future.isSuccess());
+ }
+ });
+ }
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index c1b9345..2f123db 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -38,11 +38,17 @@ public class NettyClientConfig {
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = NettySystemConfig.clientCloseSocketIfTimeout;
+ private boolean preferredDirectByteBuffer = false;
+ private boolean defaultEventExecutorGroupEnable = true;
+
private boolean useTLS;
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
+ private boolean disableCallbackExecutor = false;
+ private boolean disableNettyWorkerGroup = false;
+
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
@@ -154,4 +160,36 @@ public class NettyClientConfig {
public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
this.writeBufferHighWaterMark = writeBufferHighWaterMark;
}
+
+ public boolean isPreferredDirectByteBuffer() {
+ return preferredDirectByteBuffer;
+ }
+
+ public void setPreferredDirectByteBuffer(final boolean preferredDirectByteBuffer) {
+ this.preferredDirectByteBuffer = preferredDirectByteBuffer;
+ }
+
+ public boolean isDefaultEventExecutorGroupEnable() {
+ return defaultEventExecutorGroupEnable;
+ }
+
+ public void setDefaultEventExecutorGroupEnable(final boolean defaultEventExecutorGroupEnable) {
+ this.defaultEventExecutorGroupEnable = defaultEventExecutorGroupEnable;
+ }
+
+ public boolean isDisableCallbackExecutor() {
+ return disableCallbackExecutor;
+ }
+
+ public void setDisableCallbackExecutor(boolean disableCallbackExecutor) {
+ this.disableCallbackExecutor = disableCallbackExecutor;
+ }
+
+ public boolean isDisableNettyWorkerGroup() {
+ return disableNettyWorkerGroup;
+ }
+
+ public void setDisableNettyWorkerGroup(boolean disableNettyWorkerGroup) {
+ this.disableNettyWorkerGroup = disableNettyWorkerGroup;
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index f02518b..c6c9178 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -88,7 +88,8 @@ public abstract class NettyRemotingAbstract {
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
- * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+ * The default request processor to use in case there is no exact match in {@link #processorTable} per request
+ * code.
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
@@ -102,7 +103,6 @@ public abstract class NettyRemotingAbstract {
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
-
static {
NettyLogger.initNettyLogger();
}
@@ -168,20 +168,27 @@ public abstract class NettyRemotingAbstract {
protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
if (rpcHooks.size() > 0) {
- for (RPCHook rpcHook: rpcHooks) {
+ for (RPCHook rpcHook : rpcHooks) {
rpcHook.doBeforeRequest(addr, request);
}
}
}
- protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
+ public void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
if (rpcHooks.size() > 0) {
- for (RPCHook rpcHook: rpcHooks) {
+ for (RPCHook rpcHook : rpcHooks) {
rpcHook.doAfterResponse(addr, request, response);
}
}
}
+ public void doAfterRpcFailure(String addr, RemotingCommand request, Boolean remoteTimeout) {
+ if (rpcHooks.size() > 0) {
+ for (RPCHook rpcHook : rpcHooks) {
+ rpcHook.doAfterRpcFailure(addr, request, remoteTimeout);
+ }
+ }
+ }
/**
* Process incoming request command issued by remote peer.
@@ -198,43 +205,55 @@ public abstract class NettyRemotingAbstract {
Runnable run = new Runnable() {
@Override
public void run() {
+ Exception exception = null;
+ RemotingCommand response;
+
try {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- doBeforeRpcHooks(remoteAddr, cmd);
- final RemotingResponseCallback callback = new RemotingResponseCallback() {
- @Override
- public void callback(RemotingCommand response) {
- doAfterRpcHooks(remoteAddr, cmd, response);
- if (!cmd.isOnewayRPC()) {
- if (response != null) {
- response.setOpaque(opaque);
- response.markResponseType();
- try {
- ctx.writeAndFlush(response);
- } catch (Throwable e) {
- log.error("process request over, but response failed", e);
- log.error(cmd.toString());
- log.error(response.toString());
- }
- } else {
- }
+ try {
+ doBeforeRpcHooks(remoteAddr, cmd);
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception == null) {
+ response = pair.getObject1().processRequest(ctx, cmd);
+ } else {
+ response = RemotingCommand.createResponseCommand(null);
+ response.setCode(RemotingSysResponseCode.SYSTEM_ERROR);
+ }
+
+ try {
+ doAfterRpcHooks(remoteAddr, cmd, response);
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+
+ if (!cmd.isOnewayRPC()) {
+ if (response != null) {
+ response.setOpaque(opaque);
+ response.markResponseType();
+ try {
+ ctx.writeAndFlush(response);
+ } catch (Throwable e) {
+ log.error("process request over, but response failed", e);
+ log.error(cmd.toString());
+ log.error(response.toString());
}
+ } else {
+
}
- };
- if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
- AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
- processor.asyncProcessRequest(ctx, cmd, callback);
- } else {
- NettyRequestProcessor processor = pair.getObject1();
- RemotingCommand response = processor.processRequest(ctx, cmd);
- callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
- final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
+ response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
@@ -344,29 +363,24 @@ public abstract class NettyRemotingAbstract {
}
}
-
-
- /**
- * Custom RPC hook.
- * Just be compatible with the previous version, use getRPCHooks instead.
- */
- @Deprecated
- protected RPCHook getRPCHook() {
- if (rpcHooks.size() > 0) {
- return rpcHooks.get(0);
- }
- return null;
- }
-
/**
* Custom RPC hooks.
*
* @return RPC hooks if specified; null otherwise.
*/
- public List<RPCHook> getRPCHooks() {
+ public List<RPCHook> getRPCHook() {
return rpcHooks;
}
+ public void registerRPCHook(RPCHook rpcHook) {
+ if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
+ rpcHooks.add(rpcHook);
+ }
+ }
+
+ public void clearRPCHook() {
+ rpcHooks.clear();
+ }
/**
* This method specifies thread pool to use while invoking callback methods.
@@ -481,6 +495,10 @@ public abstract class NettyRemotingAbstract {
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
+ if (this instanceof NettyRemotingClient) {
+ NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) this;
+ nettyRemotingClient.doAfterRpcFailure(RemotingHelper.parseChannelRemoteAddr(channel), request, false);
+ }
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
@@ -513,13 +531,14 @@ public abstract class NettyRemotingAbstract {
/**
* mark the request of the specified channel as fail and to invoke fail callback immediately
+ *
* @param channel the channel which is close already
*/
protected void failFast(final Channel channel) {
Iterator<Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> entry = it.next();
- if (entry.getValue().getProcessChannel() == channel) {
+ if (entry.getValue().getChannel() == channel) {
Integer opaque = entry.getKey();
if (opaque != null) {
requestFail(opaque);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index ce83769..455d5b4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
@@ -35,20 +36,26 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,7 +65,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -70,7 +76,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
@@ -83,18 +89,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Timer timer = new Timer("ClientHouseKeepingService", true);
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
+ private final ConcurrentMap<String, Boolean> availableNamesrvAddrMap = new ConcurrentHashMap<String, Boolean>();
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
private final Lock namesrvChannelLock = new ReentrantLock();
private final ExecutorService publicExecutor;
+ private final ExecutorService scanExecutor;
/**
* Invoke the callback methods in this executor when process response.
*/
private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener;
- private DefaultEventExecutorGroup defaultEventExecutorGroup;
+ private EventExecutorGroup defaultEventExecutorGroup;
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
@@ -102,6 +110,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
+ this(nettyClientConfig, channelEventListener, null, null);
+ }
+
+ public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
+ final ChannelEventListener channelEventListener,
+ final EventLoopGroup eventLoopGroup,
+ final EventExecutorGroup eventExecutorGroup) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
@@ -112,7 +127,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
@@ -120,23 +135,39 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
});
- this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(32), new ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());
+ }
}
- });
+ );
+
+ if (eventLoopGroup != null) {
+ this.eventLoopGroupWorker = eventLoopGroup;
+ } else {
+ this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+ }
+ this.defaultEventExecutorGroup = eventExecutorGroup;
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
- log.info("SSL enabled for client");
+ LOGGER.info("SSL enabled for client");
} catch (IOException e) {
- log.error("Failed to create SSLContext", e);
+ LOGGER.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
- log.error("Failed to create SSLContext", e);
+ LOGGER.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
@@ -150,19 +181,19 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void start() {
- this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
- nettyClientConfig.getClientWorkerThreads(),
- new ThreadFactory() {
-
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
- }
- });
+ if (this.defaultEventExecutorGroup == null) {
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
+ nettyClientConfig.getClientWorkerThreads(),
+ new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
+ }
+ });
+ }
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
@@ -174,49 +205,80 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
- log.info("Prepend SSL handler");
+ LOGGER.info("Prepend SSL handler");
} else {
- log.warn("Connections are insecure as SSLContext is null!");
+ LOGGER.warn("Connections are insecure as SSLContext is null!");
}
}
- pipeline.addLast(
- defaultEventExecutorGroup,
- new NettyEncoder(),
- new NettyDecoder(),
- new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
- new NettyConnectManageHandler(),
+ if (nettyClientConfig.isDefaultEventExecutorGroupEnable() && !nettyClientConfig.isDisableNettyWorkerGroup()) {
+ ch.pipeline().addLast(defaultEventExecutorGroup);
+ }
+ ch.pipeline().addLast(//
+ new NettyEncoder(), //
+ new NettyDecoder(), //
+ new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //
+ new NettyConnectManageHandler(), //
new NettyClientHandler());
}
});
if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
- log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
+ LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
}
if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
- log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
+ LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
}
if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
- log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
+ LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
}
+ if (nettyClientConfig.getClientSocketSndBufSize() != 0) {
+ handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
+ }
+ if (nettyClientConfig.getClientSocketRcvBufSize() != 0) {
+ handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
+ }
+ if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) {
+ handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ }
+
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
- log.error("scanResponseTable exception", e);
+ LOGGER.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
- if (this.channelEventListener != null) {
- this.nettyEventExecutor.start();
- }
+// this.timer.scheduleAtFixedRate(new TimerTask() {
+// @Override
+// public void run() {
+// try {
+// NettyRemotingClient.this.scanChannelTablesOfNameServer();
+// } catch (Exception e) {
+// LOGGER.error("scanChannelTablesOfNameServer exception", e);
+// }
+// }
+// }, 1000 * 3, 10 * 1000);
+
+ this.timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ NettyRemotingClient.this.scanAvailableNameSrv();
+ } catch (Exception e) {
+ LOGGER.error("scanAvailableNameSrv exception", e);
+ }
+ }
+ }, 1000 * 3, this.nettyClientConfig.getConnectTimeoutMillis());
+
}
@Override
@@ -224,8 +286,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
try {
this.timer.cancel();
- for (ChannelWrapper cw : this.channelTables.values()) {
- this.closeChannel(null, cw.getChannel());
+ for (String addr : this.channelTables.keySet()) {
+ this.closeChannel(addr, this.channelTables.get(addr).getChannel());
}
this.channelTables.clear();
@@ -240,21 +302,30 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception e) {
- log.error("NettyRemotingClient shutdown exception, ", e);
+ LOGGER.error("NettyRemotingClient shutdown exception, ", e);
}
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
} catch (Exception e) {
- log.error("NettyRemotingServer shutdown exception, ", e);
+ LOGGER.error("NettyRemotingServer shutdown exception, ", e);
+ }
+ }
+
+ if (this.scanExecutor != null) {
+ try {
+ this.scanExecutor.shutdown();
+ } catch (Exception e) {
+ LOGGER.error("NettyRemotingServer shutdown exception, ", e);
}
}
}
public void closeChannel(final String addr, final Channel channel) {
- if (null == channel)
+ if (null == channel) {
return;
+ }
final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
@@ -264,46 +335,40 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
boolean removeItemFromTable = true;
final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
- log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
+ LOGGER.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
if (null == prevCW) {
- log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ LOGGER.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
removeItemFromTable = false;
} else if (prevCW.getChannel() != channel) {
- log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
+ LOGGER.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
addrRemote);
removeItemFromTable = false;
}
if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
- log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
}
RemotingUtil.closeChannel(channel);
} catch (Exception e) {
- log.error("closeChannel: close the channel exception", e);
+ LOGGER.error("closeChannel: close the channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
- log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ LOGGER.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
- log.error("closeChannel exception", e);
- }
- }
-
- @Override
- public void registerRPCHook(RPCHook rpcHook) {
- if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
- rpcHooks.add(rpcHook);
+ LOGGER.error("closeChannel exception", e);
}
}
public void closeChannel(final Channel channel) {
- if (null == channel)
+ if (null == channel) {
return;
+ }
try {
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
@@ -324,25 +389,25 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
if (null == prevCW) {
- log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ LOGGER.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
removeItemFromTable = false;
}
if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
- log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
RemotingUtil.closeChannel(channel);
}
} catch (Exception e) {
- log.error("closeChannel: close the channel exception", e);
+ LOGGER.error("closeChannel: close the channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
- log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ LOGGER.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
- log.error("closeChannel exception", e);
+ LOGGER.error("closeChannel exception", e);
}
}
@@ -366,11 +431,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
if (update) {
Collections.shuffle(addrs);
- log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
+ LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
this.namesrvAddrList.set(addrs);
- if (!addrs.contains(this.namesrvAddrChoosed.get())) {
- this.namesrvAddrChoosed.set(null);
+ // should close the channel if choosed addr is not exist.
+ if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) {
+ String namesrvAddr = this.namesrvAddrChoosed.get();
+ for (String addr : this.channelTables.keySet()) {
+ if (addr.contains(namesrvAddr)) {
+ ChannelWrapper channelWrapper = this.channelTables.get(addr);
+ if (channelWrapper != null) {
+ closeChannel(channelWrapper.getChannel());
+ }
+ }
+ }
}
}
}
@@ -390,17 +464,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+ this.updateChannelLastResponseTime(addr);
return response;
} catch (RemotingSendRequestException e) {
- log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+ LOGGER.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+ doAfterRpcFailure(addr, request, false);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
- log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+ LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
- log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+ doAfterRpcFailure(addr, request, true);
+ LOGGER.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
@@ -409,7 +486,54 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
- private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
+ @Override
+ public void closeChannels() {
+ closeChannels(new ArrayList<String>(this.channelTables.keySet()));
+ }
+
+ @Override
+ public void closeChannels(List<String> addrList) {
+ for (String addr : addrList) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw == null) {
+ continue;
+ }
+ this.closeChannel(addr, cw.getChannel());
+ }
+ interruptPullRequests(new HashSet<String>(addrList));
+ }
+
+ private void interruptPullRequests(Set<String> brokerAddrSet) {
+ for (ResponseFuture responseFuture : responseTable.values()) {
+ RemotingCommand cmd = responseFuture.getRequestCommand();
+ if (cmd == null) {
+ continue;
+ }
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(responseFuture.getChannel());
+ // interrupt only pull message request
+ if (brokerAddrSet.contains(remoteAddr) && (cmd.getCode() == 11 || cmd.getCode() == 361)) {
+ LOGGER.info("interrupt {}", cmd);
+ responseFuture.interrupt();
+ }
+ }
+ }
+
+ private void updateChannelLastResponseTime(final String addr) {
+ String address = addr;
+ if (address == null) {
+ address = this.namesrvAddrChoosed.get();
+ }
+ if (address == null) {
+ LOGGER.warn("[updateChannelLastResponseTime] could not find address!!");
+ return;
+ }
+ ChannelWrapper channelWrapper = this.channelTables.get(address);
+ if (channelWrapper != null && channelWrapper.isOK()) {
+ channelWrapper.updateLastResponseTime();
+ }
+ }
+
+ private Channel getAndCreateChannel(final String addr) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
}
@@ -422,7 +546,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return this.createChannel(addr);
}
- private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
+ private Channel getAndCreateNameserverChannel() throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
@@ -450,7 +574,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
- log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
+ LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
@@ -458,11 +582,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
throw new RemotingConnectException(addrList.toString());
}
+ } catch (Exception e) {
+ LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.namesrvChannelLock.unlock();
}
} else {
- log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
return null;
@@ -494,30 +620,30 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
if (createNewConnection) {
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
- log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+ LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
- log.error("createChannel: create channel exception", e);
+ LOGGER.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
- log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
- log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+ LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
- log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
+ LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString());
}
} else {
- log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
+ LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}
@@ -538,9 +664,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + addr + "] timeout");
}
- this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
+ this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
} catch (RemotingSendRequestException e) {
- log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+ LOGGER.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
@@ -559,7 +685,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
doBeforeRpcHooks(addr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
- log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+ LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+ doAfterRpcFailure(addr, request, false);
this.closeChannel(addr, channel);
throw e;
}
@@ -595,12 +722,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
@Override
+ public List<String> getAvailableNameSrvList() {
+ return new ArrayList<String>(this.availableNamesrvAddrMap.keySet());
+ }
+
+ @Override
public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}
@Override
public ExecutorService getCallbackExecutor() {
+ if (nettyClientConfig.isDisableCallbackExecutor()) {
+ return null;
+ }
return callbackExecutor != null ? callbackExecutor : publicExecutor;
}
@@ -609,11 +744,67 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.callbackExecutor = callbackExecutor;
}
+ @Override
+ public ConcurrentMap<Integer, ResponseFuture> getResponseTable() {
+ return this.responseTable;
+ }
+
+ protected void scanChannelTablesOfNameServer() {
+ List<String> nameServerList = this.namesrvAddrList.get();
+ if (nameServerList == null) {
+ LOGGER.warn("[SCAN] Addresses of name server is empty!");
+ return;
+ }
+
+ for (String addr : this.channelTables.keySet()) {
+ ChannelWrapper channelWrapper = this.channelTables.get(addr);
+ if (channelWrapper == null) {
+ continue;
+ }
+
+ if ((System.currentTimeMillis() - channelWrapper.getLastResponseTime()) > this.nettyClientConfig.getChannelNotActiveInterval()) {
+ LOGGER.warn("[SCAN] No response after {} from name server {}, so close it!", channelWrapper.getLastResponseTime(),
+ addr);
+ closeChannel(addr, channelWrapper.getChannel());
+ }
+ }
+ }
+
+ private void scanAvailableNameSrv() {
+ List<String> nameServerList = this.namesrvAddrList.get();
+ if (nameServerList == null) {
+ LOGGER.warn("scanAvailableNameSrv Addresses of name server is empty!");
+ return;
+ }
+
+ for (final String namesrvAddr : nameServerList) {
+ scanExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr);
+ if (channel != null) {
+ NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true);
+ } else {
+ NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr);
+ }
+ } catch (Exception e) {
+ LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e);
+ }
+ }
+ });
+ }
+
+ }
+
static class ChannelWrapper {
private final ChannelFuture channelFuture;
+ // only affected by sync or async request, oneway is not included.
+ private long lastResponseTime;
public ChannelWrapper(ChannelFuture channelFuture) {
this.channelFuture = channelFuture;
+ this.lastResponseTime = System.currentTimeMillis();
}
public boolean isOK() {
@@ -631,6 +822,33 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public ChannelFuture getChannelFuture() {
return channelFuture;
}
+
+ public long getLastResponseTime() {
+ return this.lastResponseTime;
+ }
+
+ public void updateLastResponseTime() {
+ this.lastResponseTime = System.currentTimeMillis();
+ }
+ }
+
+ class InvokeCallbackWrapper implements InvokeCallback {
+
+ private final InvokeCallback invokeCallback;
+ private final String addr;
+
+ public InvokeCallbackWrapper(InvokeCallback invokeCallback, String addr) {
+ this.invokeCallback = invokeCallback;
+ this.addr = addr;
+ }
+
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ if (responseFuture != null && responseFuture.isSendRequestOK() && responseFuture.getResponseCommand() != null) {
+ NettyRemotingClient.this.updateChannelLastResponseTime(addr);
+ }
+ this.invokeCallback.operationComplete(responseFuture);
+ }
}
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@@ -647,7 +865,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
- log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
+ LOGGER.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise);
@@ -659,7 +877,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
closeChannel(ctx.channel());
super.disconnect(ctx, promise);
@@ -671,7 +889,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
closeChannel(ctx.channel());
super.close(ctx, promise);
NettyRemotingClient.this.failFast(ctx.channel());
@@ -686,7 +904,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
+ LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
@@ -701,8 +919,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
- log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
+ LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
+ LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 22440af..394b536 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -45,15 +45,17 @@ import java.security.cert.CertificateException;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -77,6 +79,11 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;
+ /**
+ * NettyRemotingServer may holds multiple SubRemotingServer, each server will be stored in this container wih a
+ * ListenPort key.
+ */
+ private ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<Integer, NettyRemotingAbstract>();
private int port = 0;
@@ -156,6 +163,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
loadSslContext();
+
+ this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);
}
public void loadSslContext() {
@@ -199,7 +208,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
+ .option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
@@ -294,13 +303,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
@Override
- public void registerRPCHook(RPCHook rpcHook) {
- if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
- rpcHooks.add(rpcHook);
- }
- }
-
- @Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
@@ -327,6 +329,28 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
@Override
+ public Pair<NettyRequestProcessor, ExecutorService> getDefaultProcessorPair() {
+ return defaultRequestProcessor;
+ }
+
+ @Override
+ public RemotingServer newRemotingServer(final int port) {
+ SubRemotingServer remotingServer = new SubRemotingServer(port,
+ this.nettyServerConfig.getServerOnewaySemaphoreValue(),
+ this.nettyServerConfig.getServerAsyncSemaphoreValue());
+ NettyRemotingAbstract existingServer = this.remotingServerTable.putIfAbsent(port, remotingServer);
+ if (existingServer != null) {
+ throw new RuntimeException("The port " + port + " already in use by another RemotingServer");
+ }
+ return remotingServer;
+ }
+
+ @Override
+ public void removeRemotingServer(final int port) {
+ this.remotingServerTable.remove(port);
+ }
+
+ @Override
public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
@@ -349,7 +373,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return channelEventListener;
}
-
@Override
public ExecutorService getCallbackExecutor() {
return this.publicExecutor;
@@ -430,7 +453,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
- processMessageReceived(ctx, msg);
+ int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
+ NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);
+ if (localPort != -1 && remotingAbstract != null) {
+ remotingAbstract.processMessageReceived(ctx, msg);
+ return;
+ }
+ // The related remoting server has been shutdown, so close the connected channel
+ RemotingUtil.closeChannel(ctx.channel());
}
}
@@ -503,4 +533,110 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
RemotingUtil.closeChannel(ctx.channel());
}
}
+
+ /**
+ * The NettyRemotingServer supports bind multiple ports, each port bound by a SubRemotingServer. The
+ * SubRemotingServer will delegate all the functions to NettyRemotingServer, so the sub server can share all the
+ * resources from its parent server.
+ */
+ class SubRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+ private final int listenPort;
+ private volatile Channel serverChannel;
+
+ SubRemotingServer(final int port, final int permitsOnway, final int permitsAsync) {
+ super(permitsOnway, permitsAsync);
+ listenPort = port;
+ }
+
+ @Override
+ public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+ final ExecutorService executor) {
+ ExecutorService executorThis = executor;
+ if (null == executor) {
+ executorThis = NettyRemotingServer.this.publicExecutor;
+ }
+
+ Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ this.processorTable.put(requestCode, pair);
+ }
+
+ @Override
+ public void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor) {
+ this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
+ }
+
+ @Override
+ public int localListenPort() {
+ return listenPort;
+ }
+
+ @Override
+ public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode) {
+ return this.processorTable.get(requestCode);
+ }
+
+ @Override
+ public Pair<NettyRequestProcessor, ExecutorService> getDefaultProcessorPair() {
+ return this.defaultRequestProcessor;
+ }
+
+ @Override
+ public RemotingServer newRemotingServer(final int port) {
+ throw new UnsupportedOperationException("The SubRemotingServer of NettyRemotingServer " +
+ "doesn't support new nested RemotingServer");
+ }
+
+ @Override
+ public void removeRemotingServer(final int port) {
+ throw new UnsupportedOperationException("The SubRemotingServer of NettyRemotingServer " +
+ "doesn't support remove nested RemotingServer");
+ }
+
+ @Override
+ public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
+ final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+ return this.invokeSyncImpl(channel, request, timeoutMillis);
+ }
+
+ @Override
+ public void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+ final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+ }
+
+ @Override
+ public void invokeOneway(final Channel channel, final RemotingCommand request,
+ final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ this.invokeOnewayImpl(channel, request, timeoutMillis);
+ }
+
+ @Override
+ public void start() {
+ try {
+ this.serverChannel = NettyRemotingServer.this.serverBootstrap.bind(listenPort).sync().channel();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("this.subRemotingServer.serverBootstrap.bind().sync() InterruptedException", e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (this.serverChannel != null) {
+ try {
+ this.serverChannel.close().await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ @Override
+ public ChannelEventListener getChannelEventListener() {
+ return NettyRemotingServer.this.getChannelEventListener();
+ }
+
+ @Override
+ public ExecutorService getCallbackExecutor() {
+ return NettyRemotingServer.this.getCallbackExecutor();
+ }
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
index 4800689..040f768 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
@@ -27,5 +27,4 @@ public interface NettyRequestProcessor {
throws Exception;
boolean rejectRequest();
-
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 5f4c8c6..19f705d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -25,8 +25,9 @@ import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ResponseFuture {
+ private final Channel channel;
private final int opaque;
- private final Channel processChannel;
+ private final RemotingCommand request;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
@@ -38,11 +39,18 @@ public class ResponseFuture {
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
+ private volatile boolean interrupted = false;
public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback,
- SemaphoreReleaseOnlyOnce once) {
+ SemaphoreReleaseOnlyOnce once) {
+ this(channel, opaque, null, timeoutMillis, invokeCallback, once);
+ }
+
+ public ResponseFuture(Channel channel, int opaque, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback,
+ SemaphoreReleaseOnlyOnce once) {
+ this.channel = channel;
this.opaque = opaque;
- this.processChannel = channel;
+ this.request = request;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.once = once;
@@ -56,6 +64,11 @@ public class ResponseFuture {
}
}
+ public void interrupt() {
+ interrupted = true;
+ executeInvokeCallback();
+ }
+
public void release() {
if (this.once != null) {
this.once.release();
@@ -117,20 +130,23 @@ public class ResponseFuture {
return opaque;
}
- public Channel getProcessChannel() {
- return processChannel;
+ public RemotingCommand getRequestCommand() {
+ return request;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public boolean isInterrupted() {
+ return interrupted;
}
@Override
public String toString() {
- return "ResponseFuture [responseCommand=" + responseCommand
- + ", sendRequestOK=" + sendRequestOK
- + ", cause=" + cause
- + ", opaque=" + opaque
- + ", processChannel=" + processChannel
- + ", timeoutMillis=" + timeoutMillis
- + ", invokeCallback=" + invokeCallback
- + ", beginTimestamp=" + beginTimestamp
+ return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
+ + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
+ + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
+ ", countDownLatch=" + countDownLatch + "]";
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index a9e8415..7d614cf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -123,8 +123,7 @@ public class RemotingCommand {
return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
}
- public static RemotingCommand buildErrorResponse(int code, String remark,
- Class<? extends CommandCustomHeader> classHeader) {
+ public static RemotingCommand buildErrorResponse(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(classHeader);
response.setCode(code);
response.setRemark(remark);
@@ -256,9 +255,8 @@ public class RemotingCommand {
this.customHeader = customHeader;
}
- public CommandCustomHeader decodeCommandCustomHeader(
- Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
- CommandCustomHeader objectHeader;
+ public <T extends CommandCustomHeader> T decodeCommandCustomHeader(Class<T> classHeader) throws RemotingCommandException {
+ T objectHeader;
try {
objectHeader = classHeader.newInstance();
} catch (InstantiationException e) {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index e378a7b..a538d4b 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -43,15 +43,14 @@ public class RemotingServerTest {
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
- remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() {
+ remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress());
return request;
}
- @Override
- public boolean rejectRequest() {
+ @Override public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/SubRemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/SubRemotingServerTest.java
new file mode 100644
index 0000000..c3f4596
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/SubRemotingServerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class SubRemotingServerTest {
+ private static final int subServerPort = 1234;
+
+ private static RemotingServer remotingServer;
+ private static RemotingClient remotingClient;
+ private static RemotingServer subServer;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ remotingServer = RemotingServerTest.createRemotingServer();
+ remotingClient = RemotingServerTest.createRemotingClient();
+ subServer = createSubRemotingServer(remotingServer);
+ }
+
+ @AfterClass
+ public static void destroy() {
+ remotingClient.shutdown();
+ remotingServer.shutdown();
+ }
+
+ public static RemotingServer createSubRemotingServer(RemotingServer parentServer) {
+ RemotingServer subServer = parentServer.newRemotingServer(subServerPort);
+ subServer.registerProcessor(1, new NettyRequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+ final RemotingCommand request) throws Exception {
+ request.setRemark(String.valueOf(RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress())));
+ return request;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+ }, null);
+ subServer.start();
+ return subServer;
+ }
+
+ @Test
+ public void testInvokeSubRemotingServer() throws InterruptedException, RemotingTimeoutException, RemotingConnectException, RemotingSendRequestException {
+ RequestHeader requestHeader = new RequestHeader();
+ requestHeader.setCount(1);
+ requestHeader.setMessageTitle("Welcome");
+
+ // Parent remoting server doesn't support RequestCode 1
+ RemotingCommand request = RemotingCommand.createRequestCommand(1, requestHeader);
+ RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED);
+
+ // Issue request to SubRemotingServer
+ response = remotingClient.invokeSync("localhost:1234", request, 1000 * 3);
+ assertThat(response).isNotNull();
+ assertThat(response.getExtFields()).hasSize(2);
+ assertThat(response.getRemark()).isEqualTo(String.valueOf(subServerPort));
+
+ // Issue unsupported request to SubRemotingServer
+ request.setCode(0);
+ response = remotingClient.invokeSync("localhost:1234", request, 1000 * 3);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED);
+
+ // Issue request to a closed SubRemotingServer
+ request.setCode(1);
+ remotingServer.removeRemotingServer(subServerPort);
+ subServer.shutdown();
+ try {
+ remotingClient.invokeSync("localhost:1234", request, 1000 * 3);
+ failBecauseExceptionWasNotThrown(RemotingTimeoutException.class);
+ } catch (Exception e) {
+ assertThat(e).isInstanceOf(RemotingTimeoutException.class);
+ }
+ }
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index a272d21..58aac7d 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -37,7 +37,7 @@ public class NettyRemotingAbstractTest {
@Test
public void testProcessResponseCommand() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
- ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, new InvokeCallback() {
+ ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() {
@Override
public void operationComplete(final ResponseFuture responseFuture) {
assertThat(semaphore.availablePermits()).isEqualTo(0);
@@ -58,7 +58,7 @@ public class NettyRemotingAbstractTest {
@Test
public void testProcessResponseCommand_NullCallBack() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
- ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, null,
+ ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, null,
new SemaphoreReleaseOnlyOnce(semaphore));
remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
@@ -73,7 +73,7 @@ public class NettyRemotingAbstractTest {
@Test
public void testProcessResponseCommand_RunCallBackInCurrentThread() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
- ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, new InvokeCallback() {
+ ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() {
@Override
public void operationComplete(final ResponseFuture responseFuture) {
assertThat(semaphore.availablePermits()).isEqualTo(0);
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index ee6451d..f7ef585 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.Set;
import com.alibaba.fastjson.JSON;
+import java.util.HashSet;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;