You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/06 08:51:26 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated (911ee340c -> ce534eea1)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a change to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


    from 911ee340c [Summer of Code] Support switch role for ha service (#4236)
     add 531cdb0be sync message request mode from master (#4101)
     add 7a5fff4f1 [ISSUE #4192] Fix log split not work for dLedger in container (#4193)
     add ac886bc18 Fix check style to paas CI
     add d6a53f9c2 Polish the document of SlaveActingMasterMode.md and QuorumACK.md
     add d937c1d61 [ISSUE #4245] Remove the topic route cache in nameserver
     add e55db10c6 [ISSUE #4072] fix totalPollingNum count error (#4073)
     add a4cccea9d [ISSUE#4233] Move the capability of slaveActingMaster from container module to broker module
     new 541a6d012 Merge remote-tracking branch 'apache/5.0.0-beta' into 5.0.0-beta-dledger-controller
     new ce534eea1 Update some service name in ha service

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/rocketmq/broker/BrokerController.java   | 427 +++++++++++++++++----
 .../rocketmq/broker}/BrokerPreOnlineService.java   |  10 +-
 .../broker/client/ClientHousekeepingService.java   |   2 +-
 .../broker/dledger/DLedgerRoleChangeHandler.java   |   4 +-
 .../broker/loadbalance/AssignmentManager.java      |   2 +-
 .../longpolling/LmqPullRequestHoldService.java     |   3 +
 .../broker/longpolling/PullRequestHoldService.java |   2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  26 +-
 .../broker/processor/AdminBrokerProcessor.java     |  29 ++
 .../broker/processor/PopBufferMergeService.java    |   2 +-
 .../broker/processor/PopMessageProcessor.java      |   9 +-
 .../broker/processor/PopReviveService.java         |   2 +-
 .../broker/processor/QueryAssignmentProcessor.java |   3 +
 .../rocketmq/broker/slave/SlaveSynchronize.java    |  24 ++
 .../topic/TopicQueueMappingCleanService.java       |   4 +-
 .../AbstractTransactionalMessageCheckListener.java |   2 +-
 .../TransactionalMessageCheckService.java          |   2 +-
 .../apache/rocketmq/broker/BrokerOuterAPITest.java |   5 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../java/org/apache/rocketmq/common/UtilAll.java   |  12 +-
 .../rocketmq/common/namesrv/NamesrvConfig.java     |  33 +-
 .../rocketmq/common/protocol/RequestCode.java      |   1 +
 ...ava => MessageRequestModeSerializeWrapper.java} |  19 +-
 .../rocketmq/common/protocol/route/BrokerData.java |   8 +
 .../MessageRequestModeSerializeWrapperTest.java    |  58 +++
 .../apache/rocketmq/container/BrokerContainer.java |   9 +-
 .../rocketmq/container/BrokerContainerConfig.java  |   9 -
 .../rocketmq/container/InnerBrokerController.java  | 210 +---------
 .../container/InnerSalveBrokerController.java      | 114 ------
 .../rocketmq/container/BrokerContainerTest.java    |  20 +-
 .../rocketmq/container/BrokerPreOnlineTest.java    |   4 +-
 docs/cn/QuorumACK.md                               |   6 +-
 docs/cn/SlaveActingMasterMode.md                   |   3 +
 .../namesrv/processor/ClientRequestProcessor.java  |   8 +-
 .../namesrv/processor/DefaultRequestProcessor.java |  68 +++-
 .../namesrv/routeinfo/RouteInfoManager.java        | 184 ++++-----
 .../rocketmq/store/AllocateMappedFileService.java  |   2 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  |   2 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  29 +-
 .../apache/rocketmq/store/StoreStatsService.java   |  12 +-
 .../apache/rocketmq/store/ha/DefaultHAClient.java  |   2 +-
 .../rocketmq/store/ha/DefaultHAConnection.java     |   4 +-
 .../rocketmq/store/ha/GroupTransferService.java    |   2 +-
 .../ha/HAConnectionStateNotificationService.java   |   2 +-
 .../store/ha/autoswitch/AutoSwitchHAClient.java    |   3 +
 .../ha/autoswitch/AutoSwitchHAConnection.java      |  11 +-
 .../org/apache/rocketmq/store/ha/HAServerTest.java |   2 +-
 .../container/ContainerIntegrationTestBase.java    |  10 +-
 48 files changed, 784 insertions(+), 631 deletions(-)
 rename {container/src/main/java/org/apache/rocketmq/container => broker/src/main/java/org/apache/rocketmq/broker}/BrokerPreOnlineService.java (96%)
 copy common/src/main/java/org/apache/rocketmq/common/protocol/body/{QueryAssignmentResponseBody.java => MessageRequestModeSerializeWrapper.java} (55%)
 create mode 100644 common/src/test/java/org/apache/rocketmq/common/protocol/body/MessageRequestModeSerializeWrapperTest.java


[rocketmq] 01/02: Merge remote-tracking branch 'apache/5.0.0-beta' into 5.0.0-beta-dledger-controller

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 541a6d0124ce749197b381459b2542c0301bfaff
Merge: 911ee340c a4cccea9d
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Fri May 6 16:37:12 2022 +0800

    Merge remote-tracking branch 'apache/5.0.0-beta' into 5.0.0-beta-dledger-controller
    
    # Conflicts:
    #       store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java

 .../apache/rocketmq/broker/BrokerController.java   | 427 +++++++++++++++++----
 .../rocketmq/broker}/BrokerPreOnlineService.java   |  10 +-
 .../broker/client/ClientHousekeepingService.java   |   2 +-
 .../broker/dledger/DLedgerRoleChangeHandler.java   |   4 +-
 .../broker/loadbalance/AssignmentManager.java      |   2 +-
 .../longpolling/LmqPullRequestHoldService.java     |   3 +
 .../broker/longpolling/PullRequestHoldService.java |   2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  26 +-
 .../broker/processor/AdminBrokerProcessor.java     |  29 ++
 .../broker/processor/PopBufferMergeService.java    |   2 +-
 .../broker/processor/PopMessageProcessor.java      |   9 +-
 .../broker/processor/PopReviveService.java         |   2 +-
 .../broker/processor/QueryAssignmentProcessor.java |   3 +
 .../rocketmq/broker/slave/SlaveSynchronize.java    |  24 ++
 .../topic/TopicQueueMappingCleanService.java       |   4 +-
 .../AbstractTransactionalMessageCheckListener.java |   2 +-
 .../TransactionalMessageCheckService.java          |   2 +-
 .../apache/rocketmq/broker/BrokerOuterAPITest.java |   5 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../java/org/apache/rocketmq/common/UtilAll.java   |  12 +-
 .../rocketmq/common/namesrv/NamesrvConfig.java     |  33 +-
 .../rocketmq/common/protocol/RequestCode.java      |   1 +
 .../body/MessageRequestModeSerializeWrapper.java   |  35 ++
 .../rocketmq/common/protocol/route/BrokerData.java |   8 +
 .../MessageRequestModeSerializeWrapperTest.java    |  58 +++
 .../apache/rocketmq/container/BrokerContainer.java |   9 +-
 .../rocketmq/container/BrokerContainerConfig.java  |   9 -
 .../rocketmq/container/InnerBrokerController.java  | 210 +---------
 .../container/InnerSalveBrokerController.java      | 114 ------
 .../rocketmq/container/BrokerContainerTest.java    |  20 +-
 .../rocketmq/container/BrokerPreOnlineTest.java    |   4 +-
 docs/cn/QuorumACK.md                               |   6 +-
 docs/cn/SlaveActingMasterMode.md                   |   3 +
 .../namesrv/processor/ClientRequestProcessor.java  |   8 +-
 .../namesrv/processor/DefaultRequestProcessor.java |  68 +++-
 .../namesrv/routeinfo/RouteInfoManager.java        | 184 ++++-----
 .../rocketmq/store/AllocateMappedFileService.java  |   2 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  |   2 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  29 +-
 .../apache/rocketmq/store/StoreStatsService.java   |  12 +-
 .../apache/rocketmq/store/ha/DefaultHAClient.java  |   2 +-
 .../rocketmq/store/ha/DefaultHAConnection.java     |   2 +-
 .../rocketmq/store/ha/GroupTransferService.java    |   2 +-
 .../ha/HAConnectionStateNotificationService.java   |   2 +-
 .../org/apache/rocketmq/store/ha/HAServerTest.java |   2 +-
 .../container/ContainerIntegrationTestBase.java    |  10 +-
 46 files changed, 799 insertions(+), 616 deletions(-)

diff --cc namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 89b807744,aac1259ea..516a94564
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@@ -671,21 -594,11 +602,20 @@@ public class RouteInfoManager 
                      } else {
                          reducedBroker.add(brokerName);
                      }
 +
 +                    // Check whether we need to elect a new master
 +                    if (this.namesrvController != null && this.namesrvController.getControllerConfig().isStartupController() && this.controller != null) {
 +                        if (unRegisterRequest.getBrokerId() == 0) {
 +                            this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
 +                            // Todo: Inform the master
 +                            // However, because now the broker does not have the related api, so I will complete the process in the future.
 +                        }
 +                    }
                  }
  
-                 Set<String> changedTopics = cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
-                 this.updateTopicRouteData(changedTopics);
+                 cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
  
-                 if (!needNotifyBrokerMap.isEmpty()) {
+                 if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
                      notifyMinBrokerIdChanged(needNotifyBrokerMap);
                  }
              } finally {


[rocketmq] 02/02: Update some service name in ha service

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit ce534eea149a422299d474faac15c73871dc7ba0
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Fri May 6 16:50:51 2022 +0800

    Update some service name in ha service
---
 .../org/apache/rocketmq/store/ha/DefaultHAConnection.java     |  2 +-
 .../rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java      |  3 +++
 .../rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java  | 11 +++++++----
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
index 3b7241065..d99099844 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
@@ -432,7 +432,7 @@ public class DefaultHAConnection implements HAConnection {
         @Override
         public String getServiceName() {
             if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
-                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+                return haService.getDefaultMessageStore().getBrokerIdentity().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
             }
             return WriteSocketService.class.getSimpleName();
         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 3ddd367a7..0dae7d477 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -128,6 +128,9 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
     }
 
     @Override public String getServiceName() {
+        if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+            return haService.getDefaultMessageStore().getBrokerIdentity().getLoggerIdentifier() + AutoSwitchHAClient.class.getSimpleName();
+        }
         return AutoSwitchHAClient.class.getSimpleName();
     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 0b7ce6b39..7f9bdeba7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -39,9 +39,9 @@ import org.apache.rocketmq.store.ha.io.HAWriter;
 
 public class AutoSwitchHAConnection implements HAConnection {
     /**
-     * Header protocol in syncing msg from master.
-     * Format: current state + body size + offset + epoch  + epochStartOffset + additionalInfo(confirmOffset).
-     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the offset is maxOffset in master.
+     * Header protocol in syncing msg from master. Format: current state + body size + offset + epoch  +
+     * epochStartOffset + additionalInfo(confirmOffset). If the msg is hankeShakeMsg, the body size = EpochEntrySize *
+     * EpochEntryNums, the offset is maxOffset in master.
      */
     public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
     public static final int EPOCH_ENTRY_SIZE = 12;
@@ -218,6 +218,9 @@ public class AutoSwitchHAConnection implements HAConnection {
 
         @Override
         public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerIdentity().getLoggerIdentifier() + ReadSocketService.class.getSimpleName();
+            }
             return ReadSocketService.class.getSimpleName();
         }
 
@@ -336,7 +339,7 @@ public class AutoSwitchHAConnection implements HAConnection {
         @Override
         public String getServiceName() {
             if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
-                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+                return haService.getDefaultMessageStore().getBrokerIdentity().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
             }
             return WriteSocketService.class.getSimpleName();
         }