You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/09 08:24:35 UTC
[rocketmq] branch 5.0.0-beta updated: [ISSUE #3798] Support container in DLedger. (#4100)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new bb9d6e62b [ISSUE #3798] Support container in DLedger. (#4100)
bb9d6e62b is described below
commit bb9d6e62b96bcc349ef6ade64fb014567b23b968
Author: cserwen <cs...@163.com>
AuthorDate: Sat Apr 9 03:24:16 2022 -0500
[ISSUE #3798] Support container in DLedger. (#4100)
* support container in DLedger Model
* use BrokerIdentity to replace BrokerConfig
* add logic to support remove dLedger Broker
* add unit test for add&remove dLedger broker
* modify mqadmin to adapt to dLedger
---
.../apache/rocketmq/container/BrokerContainer.java | 56 ++++++++++++++++++++--
.../container/BrokerContainerProcessor.java | 55 +++++++++++----------
.../logback/BrokerLogbackConfigurator.java | 26 +++++-----
.../rocketmq/container/BrokerContainerTest.java | 30 ++++++++++++
.../tools/admin/DefaultMQAdminExtImpl.java | 2 +-
.../command/container/RemoveBrokerSubCommand.java | 2 +-
6 files changed, 127 insertions(+), 44 deletions(-)
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index 50b8bc49e..db6672beb 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -66,6 +66,7 @@ public class BrokerContainer implements IBrokerContainer {
private final ConcurrentMap<BrokerIdentity, InnerSalveBrokerController> slaveBrokerControllers = new ConcurrentHashMap<>();
private final ConcurrentMap<BrokerIdentity, InnerBrokerController> masterBrokerControllers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<BrokerIdentity, InnerBrokerController> dLedgerBrokerControllers = new ConcurrentHashMap<>();
private final List<BrokerBootHook> brokerBootHookList = new ArrayList<>();
private final BrokerContainerProcessor brokerContainerProcessor;
private final Configuration configuration;
@@ -221,6 +222,10 @@ public class BrokerContainer implements IBrokerContainer {
masterBrokerControllers.clear();
+ // Shutdown dLedger brokers
+ dLedgerBrokerControllers.values().forEach(InnerBrokerController::shutdown);
+ dLedgerBrokerControllers.clear();
+
// Shutdown the remoting server with a high priority to avoid further traffic
if (this.remotingServer != null) {
this.remotingServer.shutdown();
@@ -258,16 +263,51 @@ public class BrokerContainer implements IBrokerContainer {
@Override
public InnerBrokerController addBroker(final BrokerConfig brokerConfig,
final MessageStoreConfig storeConfig) throws Exception {
- if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
- return this.addMasterBroker(brokerConfig, storeConfig);
- }
- if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
- return this.addSlaveBroker(brokerConfig, storeConfig);
+ if (storeConfig.isEnableDLegerCommitLog()) {
+ return this.addDLedgerBroker(brokerConfig, storeConfig);
+ } else {
+ if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
+ return this.addMasterBroker(brokerConfig, storeConfig);
+ }
+ if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
+ return this.addSlaveBroker(brokerConfig, storeConfig);
+ }
}
return null;
}
+ public InnerBrokerController addDLedgerBroker(final BrokerConfig brokerConfig, final MessageStoreConfig storeConfig) throws Exception {
+ brokerConfig.setInBrokerContainer(true);
+ if (storeConfig.isDuplicationEnable()) {
+ LOG.error("Can not add broker to container when duplicationEnable is true currently");
+ throw new Exception("Can not add broker to container when duplicationEnable is true currently");
+ }
+ InnerBrokerController brokerController = new InnerBrokerController(this, brokerConfig, storeConfig);
+ BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+ brokerConfig.getBrokerName(), Integer.parseInt(storeConfig.getdLegerSelfId().substring(1)));
+ final BrokerController previousBroker = dLedgerBrokerControllers.putIfAbsent(brokerIdentity, brokerController);
+ if (previousBroker == null) {
+ // New dLedger broker added, start it
+ try {
+ BrokerLogbackConfigurator.doConfigure(brokerIdentity);
+ final boolean initResult = brokerController.initialize();
+ if (!initResult) {
+ brokerController.shutdown();
+ dLedgerBrokerControllers.remove(brokerIdentity);
+ throw new Exception("Failed to init dLedger broker " + brokerIdentity.getCanonicalName());
+ }
+ } catch (Exception e) {
+ // Remove the failed dLedger broker and throw the exception
+ brokerController.shutdown();
+ dLedgerBrokerControllers.remove(brokerIdentity);
+ throw new Exception("Failed to initialize dLedger broker " + brokerIdentity.getCanonicalName(), e);
+ }
+ return brokerController;
+ }
+ throw new Exception(brokerIdentity.getCanonicalName() + " has already been added to current broker");
+ }
+
public InnerBrokerController addMasterBroker(final BrokerConfig masterBrokerConfig,
final MessageStoreConfig storeConfig) throws Exception {
@@ -356,6 +396,12 @@ public class BrokerContainer implements IBrokerContainer {
@Override
public BrokerController removeBroker(final BrokerIdentity brokerIdentity) throws Exception {
+ InnerBrokerController dLedgerController = dLedgerBrokerControllers.remove(brokerIdentity);
+ if (dLedgerController != null) {
+ dLedgerController.shutdown();
+ return dLedgerController;
+ }
+
InnerSalveBrokerController slaveBroker = slaveBrokerControllers.remove(brokerIdentity);
if (slaveBroker != null) {
slaveBroker.shutdown();
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
index 6893882ec..2512a3307 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
@@ -116,29 +116,31 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
brokerConfig.setBrokerConfigPath(configPath);
}
- switch (messageStoreConfig.getBrokerRole()) {
- case ASYNC_MASTER:
- case SYNC_MASTER:
- brokerConfig.setBrokerId(MixAll.MASTER_ID);
- break;
- case SLAVE:
- if (brokerConfig.getBrokerId() <= 0) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("slave broker id must be > 0");
- return response;
- }
- break;
- default:
- break;
+ if (!messageStoreConfig.isEnableDLegerCommitLog()) {
+ switch (messageStoreConfig.getBrokerRole()) {
+ case ASYNC_MASTER:
+ case SYNC_MASTER:
+ brokerConfig.setBrokerId(MixAll.MASTER_ID);
+ break;
+ case SLAVE:
+ if (brokerConfig.getBrokerId() <= 0) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("slave broker id must be > 0");
+ return response;
+ }
+ break;
+ default:
+ break;
- }
+ }
- if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
- || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
- || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("invalid replicas number");
- return response;
+ if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
+ || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
+ || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("invalid replicas number");
+ return response;
+ }
}
BrokerController brokerController;
@@ -163,9 +165,14 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
}
} catch (Exception e) {
LOGGER.error("start broker exception {}", e);
- BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
- brokerConfig.getBrokerName(),
- brokerConfig.getBrokerId());
+ BrokerIdentity brokerIdentity;
+ if (messageStoreConfig.isEnableDLegerCommitLog()) {
+ brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+ brokerConfig.getBrokerName(), Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1)));
+ } else {
+ brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+ brokerConfig.getBrokerName(), brokerConfig.getBrokerId());
+ }
this.brokerContainer.removeBroker(brokerIdentity);
brokerController.shutdown();
response.setCode(ResponseCode.SYSTEM_ERROR);
diff --git a/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
index d4b4a75d0..45cbe9367 100644
--- a/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
+++ b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
@@ -36,7 +36,7 @@ import java.util.HashSet;
import java.util.Set;
import ch.qos.logback.core.util.FileSize;
-import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -53,8 +53,8 @@ public class BrokerLogbackConfigurator {
public static final String SUFFIX_APPENDER = "Appender";
public static final String SUFFIX_INNER_APPENDER = "_inner";
- public static void doConfigure(BrokerConfig brokerConfig) {
- if (!CONFIGURED_BROKER_LIST.contains(brokerConfig.getCanonicalName())) {
+ public static void doConfigure(BrokerIdentity brokerIdentity) {
+ if (!CONFIGURED_BROKER_LIST.contains(brokerIdentity.getCanonicalName())) {
try {
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
for (ch.qos.logback.classic.Logger tempLogger : lc.getLoggerList()) {
@@ -64,7 +64,7 @@ public class BrokerLogbackConfigurator {
&& !loggerName.equals(LoggerName.ACCOUNT_LOGGER_NAME)
&& !loggerName.equals(LoggerName.COMMERCIAL_LOGGER_NAME)
&& !loggerName.equals(LoggerName.CONSUMER_STATS_LOGGER_NAME)) {
- ch.qos.logback.classic.Logger logger = lc.getLogger(brokerConfig.getLoggerIdentifier() + loggerName);
+ ch.qos.logback.classic.Logger logger = lc.getLogger(brokerIdentity.getLoggerIdentifier() + loggerName);
logger.setAdditive(tempLogger.isAdditive());
logger.setLevel(tempLogger.getLevel());
String appenderName = loggerName + SUFFIX_APPENDER;
@@ -72,7 +72,7 @@ public class BrokerLogbackConfigurator {
if (tempAppender instanceof AsyncAppender) {
AsyncAppender tempAsyncAppender = (AsyncAppender) tempAppender;
AsyncAppender asyncAppender = new AsyncAppender();
- asyncAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+ asyncAppender.setName(brokerIdentity.getLoggerIdentifier() + appenderName);
asyncAppender.setContext(tempAsyncAppender.getContext());
String innerAppenderName = appenderName + SUFFIX_INNER_APPENDER;
@@ -81,34 +81,34 @@ public class BrokerLogbackConfigurator {
continue;
}
asyncAppender.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempInnerAppender,
- brokerConfig, innerAppenderName));
+ brokerIdentity, innerAppenderName));
asyncAppender.start();
logger.addAppender(asyncAppender);
} else if (tempAppender instanceof RollingFileAppender) {
logger.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempAppender,
- brokerConfig, appenderName));
+ brokerIdentity, appenderName));
}
}
}
} catch (Exception e) {
- LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerConfig.getCanonicalName(), e);
+ LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerIdentity.getCanonicalName(), e);
return;
}
- CONFIGURED_BROKER_LIST.add(brokerConfig.getCanonicalName());
+ CONFIGURED_BROKER_LIST.add(brokerIdentity.getCanonicalName());
}
}
private static RollingFileAppender<ILoggingEvent> configureRollingFileAppender(
- RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerConfig brokerConfig, String appenderName)
+ RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerIdentity brokerIdentity, String appenderName)
throws NoSuchFieldException, IllegalAccessException {
RollingFileAppender<ILoggingEvent> rollingFileAppender = new RollingFileAppender<>();
// configure appender name
- rollingFileAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+ rollingFileAppender.setName(brokerIdentity.getLoggerIdentifier() + appenderName);
// configure file name
- rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+ rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerIdentity.getCanonicalName() + "_" + ROCKETMQ_LOGS));
// configure append
rollingFileAppender.setAppend(true);
@@ -144,7 +144,7 @@ public class BrokerLogbackConfigurator {
FixedWindowRollingPolicy tempRollingPolicy = (FixedWindowRollingPolicy) originalRollingPolicy;
FixedWindowRollingPolicy rollingPolicy = new FixedWindowRollingPolicy();
rollingPolicy.setContext(tempRollingPolicy.getContext());
- rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+ rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerIdentity.getCanonicalName() + "_" + ROCKETMQ_LOGS));
rollingPolicy.setMaxIndex(tempRollingPolicy.getMaxIndex());
rollingPolicy.setMinIndex(tempRollingPolicy.getMinIndex());
rollingPolicy.setParent(rollingFileAppender);
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
index 42c934d73..d63446e99 100644
--- a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
@@ -241,6 +241,36 @@ public class BrokerContainerTest {
master.getMessageStore().destroy();
}
+ @Test
+ public void testAddAndRemoveDLedgerBroker() throws Exception {
+ BrokerContainer brokerContainer = new BrokerContainer(
+ new BrokerContainerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig());
+ assertThat(brokerContainer.initialize()).isTrue();
+ brokerContainer.start();
+
+ BrokerConfig dLedgerBrokerConfig = new BrokerConfig();
+ String baseDir = createBaseDir("unnittest-dLedger").getAbsolutePath();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setStorePathRootDir(baseDir);
+ messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ messageStoreConfig.setEnableDLegerCommitLog(true);
+ messageStoreConfig.setdLegerSelfId("n0");
+ messageStoreConfig.setdLegerGroup("group");
+ messageStoreConfig.setdLegerPeers(String.format("n0-localhost:%d", generatePort(30900, 10000)));
+ InnerBrokerController dLedger = brokerContainer.addBroker(dLedgerBrokerConfig, messageStoreConfig);
+ assertThat(dLedger).isNotNull();
+ dLedger.start();
+ assertThat(dLedger.isIsolated()).isFalse();
+
+ brokerContainer.removeBroker(new BrokerIdentity(dLedgerBrokerConfig.getBrokerClusterName(), dLedgerBrokerConfig.getBrokerName(), Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1))));
+ assertThat(brokerContainer.getMasterBrokers().size()).isEqualTo(0);
+
+ brokerContainer.shutdown();
+ dLedger.getMessageStore().destroy();
+ }
+
@Test
public void testAddAndRemoveSlaveSuccess() throws Exception {
BrokerContainer brokerContainer = new BrokerContainer(
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index dfd33259b..7b78ead3a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -206,7 +206,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override public void addBrokerToContainer(String brokerContainerAddr,
String brokerConfig) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
- this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, timeoutMillis);
+ this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, 20000);
}
@Override public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
index 9907d8add..86990f61c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
@@ -39,7 +39,7 @@ public class RemoveBrokerSubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
- opt = new Option("b", "brokerIdentity", true, "Information to identify a broker: clusterName:brokerName:brokerId");
+ opt = new Option("b", "brokerIdentity", true, "Information to identify a broker: clusterName:brokerName:brokerId(dLedgerId for dLedger)");
opt.setRequired(true);
options.addOption(opt);