You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/07 01:14:32 UTC

[rocketmq] 06/14: feat(controller): implement logic about dealing with UpdateBrokerAddress event

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 56578f99fbde9c759d095ef5fea24ab1f5d655de
Author: TheR1sing3un <th...@163.com>
AuthorDate: Sun Feb 5 14:54:05 2023 +0800

    feat(controller): implement logic about dealing with UpdateBrokerAddress event
    
    1. implement logic about dealing with UpdateBrokerAddress event
---
 .../controller/impl/manager/BrokerReplicaInfo.java     |  7 +++++++
 .../controller/impl/manager/ReplicasInfoManager.java   | 18 ++++++++++++------
 2 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index abfaf275c..24e67bf1e 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -93,4 +93,11 @@ public class BrokerReplicaInfo {
         }
         return null;
     }
+
+    public void updateBrokerAddress(final Long brokerId, final String brokerAddress) {
+        Pair<String, String> oldPair = this.brokerIdInfo.get(brokerId);
+        if (oldPair != null) {
+            this.brokerIdInfo.put(brokerId, new Pair<>(brokerAddress, oldPair.getObject2()));
+        }
+    }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 7eca573cf..1400932e0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -57,12 +56,9 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBro
 import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
 
-import javax.naming.ldap.Control;
 
 /**
  * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
@@ -484,6 +480,9 @@ public class ReplicasInfoManager {
             case CLEAN_BROKER_DATA_EVENT:
                 handleCleanBrokerDataEvent((CleanBrokerDataEvent) event);
                 break;
+            case UPDATE_BROKER_ADDRESS:
+                handleUpdateBrokerAddress((UpdateBrokerAddressEvent) event);
+                break;
             default:
                 break;
         }
@@ -509,8 +508,7 @@ public class ReplicasInfoManager {
             // Initialize the replicaInfo about this broker set
             final String clusterName = event.getClusterName();
             final BrokerReplicaInfo brokerReplicaInfo = new BrokerReplicaInfo(clusterName, brokerName);
-            long brokerId = brokerReplicaInfo.newBrokerId();
-            brokerReplicaInfo.addBroker(brokerId, event.getBrokerAddress(), event.getRegisterCheckCode());
+            brokerReplicaInfo.addBroker(event.getNewBrokerId(), event.getBrokerAddress(), event.getRegisterCheckCode());
             this.replicaInfoTable.put(brokerName, brokerReplicaInfo);
             final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName);
             // Initialize an empty syncStateInfo for this broker set
@@ -518,6 +516,14 @@ public class ReplicasInfoManager {
         }
     }
 
+    private void handleUpdateBrokerAddress(final UpdateBrokerAddressEvent event) {
+        final String brokerName = event.getBrokerName();
+        final String brokerAddress = event.getBrokerAddress();
+        final Long brokerId = event.getBrokerId();
+        BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+        brokerReplicaInfo.updateBrokerAddress(brokerId, brokerAddress);
+    }
+
     private void handleElectMaster(final ElectMasterEvent event) {
         final String brokerName = event.getBrokerName();
         final Long newMaster = event.getNewMasterBrokerId();