You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/07 01:14:32 UTC
[rocketmq] 06/14: feat(controller): implement logic about dealing with UpdateBrokerAddress event
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 56578f99fbde9c759d095ef5fea24ab1f5d655de
Author: TheR1sing3un <th...@163.com>
AuthorDate: Sun Feb 5 14:54:05 2023 +0800
feat(controller): implement logic about dealing with UpdateBrokerAddress event
1. implement logic about dealing with UpdateBrokerAddress event
---
.../controller/impl/manager/BrokerReplicaInfo.java | 7 +++++++
.../controller/impl/manager/ReplicasInfoManager.java | 18 ++++++++++++------
2 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index abfaf275c..24e67bf1e 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -93,4 +93,11 @@ public class BrokerReplicaInfo {
}
return null;
}
+
+ public void updateBrokerAddress(final Long brokerId, final String brokerAddress) {
+ Pair<String, String> oldPair = this.brokerIdInfo.get(brokerId);
+ if (oldPair != null) {
+ this.brokerIdInfo.put(brokerId, new Pair<>(brokerAddress, oldPair.getObject2()));
+ }
+ }
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 7eca573cf..1400932e0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -57,12 +56,9 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBro
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
-import javax.naming.ldap.Control;
/**
* The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
@@ -484,6 +480,9 @@ public class ReplicasInfoManager {
case CLEAN_BROKER_DATA_EVENT:
handleCleanBrokerDataEvent((CleanBrokerDataEvent) event);
break;
+ case UPDATE_BROKER_ADDRESS:
+ handleUpdateBrokerAddress((UpdateBrokerAddressEvent) event);
+ break;
default:
break;
}
@@ -509,8 +508,7 @@ public class ReplicasInfoManager {
// Initialize the replicaInfo about this broker set
final String clusterName = event.getClusterName();
final BrokerReplicaInfo brokerReplicaInfo = new BrokerReplicaInfo(clusterName, brokerName);
- long brokerId = brokerReplicaInfo.newBrokerId();
- brokerReplicaInfo.addBroker(brokerId, event.getBrokerAddress(), event.getRegisterCheckCode());
+ brokerReplicaInfo.addBroker(event.getNewBrokerId(), event.getBrokerAddress(), event.getRegisterCheckCode());
this.replicaInfoTable.put(brokerName, brokerReplicaInfo);
final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName);
// Initialize an empty syncStateInfo for this broker set
@@ -518,6 +516,14 @@ public class ReplicasInfoManager {
}
}
+ private void handleUpdateBrokerAddress(final UpdateBrokerAddressEvent event) {
+ final String brokerName = event.getBrokerName();
+ final String brokerAddress = event.getBrokerAddress();
+ final Long brokerId = event.getBrokerId();
+ BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+ brokerReplicaInfo.updateBrokerAddress(brokerId, brokerAddress);
+ }
+
private void handleElectMaster(final ElectMasterEvent event) {
final String brokerName = event.getBrokerName();
final Long newMaster = event.getNewMasterBrokerId();