You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/09 08:24:35 UTC

[rocketmq] branch 5.0.0-beta updated: [ISSUE #3798] Support container in DLedger. (#4100)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new bb9d6e62b [ISSUE #3798] Support container in DLedger. (#4100)
bb9d6e62b is described below

commit bb9d6e62b96bcc349ef6ade64fb014567b23b968
Author: cserwen <cs...@163.com>
AuthorDate: Sat Apr 9 03:24:16 2022 -0500

    [ISSUE #3798] Support container in DLedger. (#4100)
    
    * support container in DLedger Model
    
    * use BrokerIdentity to replace BrokerConfig
    
    * add logic to support remove dLedger Broker
    
    * add unit test for add&remove dLedger broker
    
    * modify mqadmin to adapt to dLedger
---
 .../apache/rocketmq/container/BrokerContainer.java | 56 ++++++++++++++++++++--
 .../container/BrokerContainerProcessor.java        | 55 +++++++++++----------
 .../logback/BrokerLogbackConfigurator.java         | 26 +++++-----
 .../rocketmq/container/BrokerContainerTest.java    | 30 ++++++++++++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  2 +-
 .../command/container/RemoveBrokerSubCommand.java  |  2 +-
 6 files changed, 127 insertions(+), 44 deletions(-)

diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index 50b8bc49e..db6672beb 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -66,6 +66,7 @@ public class BrokerContainer implements IBrokerContainer {
 
     private final ConcurrentMap<BrokerIdentity, InnerSalveBrokerController> slaveBrokerControllers = new ConcurrentHashMap<>();
     private final ConcurrentMap<BrokerIdentity, InnerBrokerController> masterBrokerControllers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<BrokerIdentity, InnerBrokerController> dLedgerBrokerControllers = new ConcurrentHashMap<>();
     private final List<BrokerBootHook> brokerBootHookList = new ArrayList<>();
     private final BrokerContainerProcessor brokerContainerProcessor;
     private final Configuration configuration;
@@ -221,6 +222,10 @@ public class BrokerContainer implements IBrokerContainer {
 
         masterBrokerControllers.clear();
 
+        // Shutdown dLedger brokers
+        dLedgerBrokerControllers.values().forEach(InnerBrokerController::shutdown);
+        dLedgerBrokerControllers.clear();
+
         // Shutdown the remoting server with a high priority to avoid further traffic
         if (this.remotingServer != null) {
             this.remotingServer.shutdown();
@@ -258,16 +263,51 @@ public class BrokerContainer implements IBrokerContainer {
     @Override
     public InnerBrokerController addBroker(final BrokerConfig brokerConfig,
         final MessageStoreConfig storeConfig) throws Exception {
-        if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
-            return this.addMasterBroker(brokerConfig, storeConfig);
-        }
-        if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
-            return this.addSlaveBroker(brokerConfig, storeConfig);
+        if (storeConfig.isEnableDLegerCommitLog()) {
+            return this.addDLedgerBroker(brokerConfig, storeConfig);
+        } else {
+            if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
+                return this.addMasterBroker(brokerConfig, storeConfig);
+            }
+            if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
+                return this.addSlaveBroker(brokerConfig, storeConfig);
+            }
         }
 
         return null;
     }
 
+    public InnerBrokerController addDLedgerBroker(final BrokerConfig brokerConfig, final MessageStoreConfig storeConfig) throws Exception {
+        brokerConfig.setInBrokerContainer(true);
+        if (storeConfig.isDuplicationEnable()) {
+            LOG.error("Can not add broker to container when duplicationEnable is true currently");
+            throw new Exception("Can not add broker to container when duplicationEnable is true currently");
+        }
+        InnerBrokerController brokerController = new InnerBrokerController(this, brokerConfig, storeConfig);
+        BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+                brokerConfig.getBrokerName(), Integer.parseInt(storeConfig.getdLegerSelfId().substring(1)));
+        final BrokerController previousBroker = dLedgerBrokerControllers.putIfAbsent(brokerIdentity, brokerController);
+        if (previousBroker == null) {
+            // New dLedger broker added, start it
+            try {
+                BrokerLogbackConfigurator.doConfigure(brokerIdentity);
+                final boolean initResult = brokerController.initialize();
+                if (!initResult) {
+                    brokerController.shutdown();
+                    dLedgerBrokerControllers.remove(brokerIdentity);
+                    throw new Exception("Failed to init dLedger broker " + brokerIdentity.getCanonicalName());
+                }
+            } catch (Exception e) {
+                // Remove the failed dLedger broker and throw the exception
+                brokerController.shutdown();
+                dLedgerBrokerControllers.remove(brokerIdentity);
+                throw new Exception("Failed to initialize dLedger broker " + brokerIdentity.getCanonicalName(), e);
+            }
+            return brokerController;
+        }
+        throw new Exception(brokerIdentity.getCanonicalName() + " has already been added to current broker");
+    }
+
     public InnerBrokerController addMasterBroker(final BrokerConfig masterBrokerConfig,
         final MessageStoreConfig storeConfig) throws Exception {
 
@@ -356,6 +396,12 @@ public class BrokerContainer implements IBrokerContainer {
     @Override
     public BrokerController removeBroker(final BrokerIdentity brokerIdentity) throws Exception {
 
+        InnerBrokerController dLedgerController = dLedgerBrokerControllers.remove(brokerIdentity);
+        if (dLedgerController != null) {
+            dLedgerController.shutdown();
+            return dLedgerController;
+        }
+
         InnerSalveBrokerController slaveBroker = slaveBrokerControllers.remove(brokerIdentity);
         if (slaveBroker != null) {
             slaveBroker.shutdown();
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
index 6893882ec..2512a3307 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
@@ -116,29 +116,31 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
             brokerConfig.setBrokerConfigPath(configPath);
         }
 
-        switch (messageStoreConfig.getBrokerRole()) {
-            case ASYNC_MASTER:
-            case SYNC_MASTER:
-                brokerConfig.setBrokerId(MixAll.MASTER_ID);
-                break;
-            case SLAVE:
-                if (brokerConfig.getBrokerId() <= 0) {
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("slave broker id must be > 0");
-                    return response;
-                }
-                break;
-            default:
-                break;
+        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
+            switch (messageStoreConfig.getBrokerRole()) {
+                case ASYNC_MASTER:
+                case SYNC_MASTER:
+                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                    break;
+                case SLAVE:
+                    if (brokerConfig.getBrokerId() <= 0) {
+                        response.setCode(ResponseCode.SYSTEM_ERROR);
+                        response.setRemark("slave broker id must be > 0");
+                        return response;
+                    }
+                    break;
+                default:
+                    break;
 
-        }
+            }
 
-        if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
-            || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
-            || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("invalid replicas number");
-            return response;
+            if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
+                    || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
+                    || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("invalid replicas number");
+                return response;
+            }
         }
 
         BrokerController brokerController;
@@ -163,9 +165,14 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
                 }
             } catch (Exception e) {
                 LOGGER.error("start broker exception {}", e);
-                BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
-                    brokerConfig.getBrokerName(),
-                    brokerConfig.getBrokerId());
+                BrokerIdentity brokerIdentity;
+                if (messageStoreConfig.isEnableDLegerCommitLog()) {
+                    brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+                        brokerConfig.getBrokerName(), Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1)));
+                } else {
+                    brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+                        brokerConfig.getBrokerName(), brokerConfig.getBrokerId());
+                }
                 this.brokerContainer.removeBroker(brokerIdentity);
                 brokerController.shutdown();
                 response.setCode(ResponseCode.SYSTEM_ERROR);
diff --git a/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
index d4b4a75d0..45cbe9367 100644
--- a/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
+++ b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
@@ -36,7 +36,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import ch.qos.logback.core.util.FileSize;
-import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -53,8 +53,8 @@ public class BrokerLogbackConfigurator {
     public static final String SUFFIX_APPENDER = "Appender";
     public static final String SUFFIX_INNER_APPENDER = "_inner";
 
-    public static void doConfigure(BrokerConfig brokerConfig) {
-        if (!CONFIGURED_BROKER_LIST.contains(brokerConfig.getCanonicalName())) {
+    public static void doConfigure(BrokerIdentity brokerIdentity) {
+        if (!CONFIGURED_BROKER_LIST.contains(brokerIdentity.getCanonicalName())) {
             try {
                 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
                 for (ch.qos.logback.classic.Logger tempLogger : lc.getLoggerList()) {
@@ -64,7 +64,7 @@ public class BrokerLogbackConfigurator {
                         && !loggerName.equals(LoggerName.ACCOUNT_LOGGER_NAME)
                         && !loggerName.equals(LoggerName.COMMERCIAL_LOGGER_NAME)
                         && !loggerName.equals(LoggerName.CONSUMER_STATS_LOGGER_NAME)) {
-                        ch.qos.logback.classic.Logger logger = lc.getLogger(brokerConfig.getLoggerIdentifier() + loggerName);
+                        ch.qos.logback.classic.Logger logger = lc.getLogger(brokerIdentity.getLoggerIdentifier() + loggerName);
                         logger.setAdditive(tempLogger.isAdditive());
                         logger.setLevel(tempLogger.getLevel());
                         String appenderName = loggerName + SUFFIX_APPENDER;
@@ -72,7 +72,7 @@ public class BrokerLogbackConfigurator {
                         if (tempAppender instanceof AsyncAppender) {
                             AsyncAppender tempAsyncAppender = (AsyncAppender) tempAppender;
                             AsyncAppender asyncAppender = new AsyncAppender();
-                            asyncAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+                            asyncAppender.setName(brokerIdentity.getLoggerIdentifier() + appenderName);
                             asyncAppender.setContext(tempAsyncAppender.getContext());
 
                             String innerAppenderName = appenderName + SUFFIX_INNER_APPENDER;
@@ -81,34 +81,34 @@ public class BrokerLogbackConfigurator {
                                 continue;
                             }
                             asyncAppender.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempInnerAppender,
-                                brokerConfig, innerAppenderName));
+                                brokerIdentity, innerAppenderName));
                             asyncAppender.start();
                             logger.addAppender(asyncAppender);
                         } else if (tempAppender instanceof RollingFileAppender) {
                             logger.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempAppender,
-                                brokerConfig, appenderName));
+                                brokerIdentity, appenderName));
                         }
                     }
                 }
             } catch (Exception e) {
-                LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerConfig.getCanonicalName(), e);
+                LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerIdentity.getCanonicalName(), e);
                 return;
             }
 
-            CONFIGURED_BROKER_LIST.add(brokerConfig.getCanonicalName());
+            CONFIGURED_BROKER_LIST.add(brokerIdentity.getCanonicalName());
         }
     }
 
     private static RollingFileAppender<ILoggingEvent> configureRollingFileAppender(
-        RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerConfig brokerConfig, String appenderName)
+        RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerIdentity brokerIdentity, String appenderName)
         throws NoSuchFieldException, IllegalAccessException {
         RollingFileAppender<ILoggingEvent> rollingFileAppender = new RollingFileAppender<>();
 
         // configure appender name
-        rollingFileAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+        rollingFileAppender.setName(brokerIdentity.getLoggerIdentifier() + appenderName);
 
         // configure file name
-        rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+        rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerIdentity.getCanonicalName() + "_" + ROCKETMQ_LOGS));
 
         // configure append
         rollingFileAppender.setAppend(true);
@@ -144,7 +144,7 @@ public class BrokerLogbackConfigurator {
             FixedWindowRollingPolicy tempRollingPolicy = (FixedWindowRollingPolicy) originalRollingPolicy;
             FixedWindowRollingPolicy rollingPolicy = new FixedWindowRollingPolicy();
             rollingPolicy.setContext(tempRollingPolicy.getContext());
-            rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+            rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerIdentity.getCanonicalName() + "_" + ROCKETMQ_LOGS));
             rollingPolicy.setMaxIndex(tempRollingPolicy.getMaxIndex());
             rollingPolicy.setMinIndex(tempRollingPolicy.getMinIndex());
             rollingPolicy.setParent(rollingFileAppender);
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
index 42c934d73..d63446e99 100644
--- a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
@@ -241,6 +241,36 @@ public class BrokerContainerTest {
         master.getMessageStore().destroy();
     }
 
+    @Test
+    public void testAddAndRemoveDLedgerBroker() throws Exception {
+        BrokerContainer brokerContainer = new BrokerContainer(
+                new BrokerContainerConfig(),
+                new NettyServerConfig(),
+                new NettyClientConfig());
+        assertThat(brokerContainer.initialize()).isTrue();
+        brokerContainer.start();
+
+        BrokerConfig dLedgerBrokerConfig = new BrokerConfig();
+        String baseDir = createBaseDir("unnittest-dLedger").getAbsolutePath();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(baseDir);
+        messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+        messageStoreConfig.setEnableDLegerCommitLog(true);
+        messageStoreConfig.setdLegerSelfId("n0");
+        messageStoreConfig.setdLegerGroup("group");
+        messageStoreConfig.setdLegerPeers(String.format("n0-localhost:%d", generatePort(30900, 10000)));
+        InnerBrokerController dLedger = brokerContainer.addBroker(dLedgerBrokerConfig, messageStoreConfig);
+        assertThat(dLedger).isNotNull();
+        dLedger.start();
+        assertThat(dLedger.isIsolated()).isFalse();
+
+        brokerContainer.removeBroker(new BrokerIdentity(dLedgerBrokerConfig.getBrokerClusterName(), dLedgerBrokerConfig.getBrokerName(), Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1))));
+        assertThat(brokerContainer.getMasterBrokers().size()).isEqualTo(0);
+
+        brokerContainer.shutdown();
+        dLedger.getMessageStore().destroy();
+    }
+
     @Test
     public void testAddAndRemoveSlaveSuccess() throws Exception {
         BrokerContainer brokerContainer = new BrokerContainer(
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index dfd33259b..7b78ead3a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -206,7 +206,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
     @Override public void addBrokerToContainer(String brokerContainerAddr,
         String brokerConfig) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
-        this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, timeoutMillis);
+        this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, 20000);
     }
 
     @Override public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
index 9907d8add..86990f61c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
@@ -39,7 +39,7 @@ public class RemoveBrokerSubCommand implements SubCommand {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("b", "brokerIdentity", true, "Information to identify a broker: clusterName:brokerName:brokerId");
+        opt = new Option("b", "brokerIdentity", true, "Information to identify a broker: clusterName:brokerName:brokerId(dLedgerId for dLedger)");
         opt.setRequired(true);
         options.addOption(opt);