You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2023/01/30 08:01:35 UTC
[rocketmq] branch develop updated: [ISSUE #5929] Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition (#5930)
This is an automated email from the ASF dual-hosted git repository.
huitong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1e7c861c1 [ISSUE #5929] Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition (#5930)
1e7c861c1 is described below
commit 1e7c861c1e824ed1d7d2b1002c7078f1ece35e27
Author: rongtong <ji...@163.com>
AuthorDate: Mon Jan 30 16:01:28 2023 +0800
[ISSUE #5929] Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition (#5930)
* Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition
* Fix the ReplicasManagerTest can not pass
* Reslove conflict with #5922
* Fix code style
* Rename isAddressCanConnect to isAddressReachable
* Rename isAddressCanConnect to isAddressReachable
---
.../apache/rocketmq/broker/BrokerController.java | 2 +-
.../broker/controller/ReplicasManager.java | 48 +++++++++++++++++++++-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 4 ++
.../broker/controller/ReplicasManagerTest.java | 1 +
.../apache/rocketmq/remoting/RemotingClient.java | 2 +
.../remoting/netty/NettyRemotingClient.java | 14 +++++++
6 files changed, 69 insertions(+), 2 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8b3dbdfa9..a96aa0405 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1739,7 +1739,7 @@ public class BrokerController {
protected void sendHeartbeat() {
if (this.brokerConfig.isEnableControllerMode()) {
- final List<String> controllerAddresses = this.replicasManager.getControllerAddresses();
+ final List<String> controllerAddresses = this.replicasManager.getAvailableControllerAddresses();
for (String controllerAddress : controllerAddresses) {
if (StringUtils.isNotEmpty(controllerAddress)) {
this.brokerOuterAPI.sendHeartbeatToController(
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index dd1c4385e..677faca02 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -17,14 +17,19 @@
package org.apache.rocketmq.broker.controller;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
@@ -60,12 +65,14 @@ public class ReplicasManager {
private final ScheduledExecutorService scheduledService;
private final ExecutorService executorService;
+ private final ExecutorService scanExecutor;
private final BrokerController brokerController;
private final AutoSwitchHAService haService;
private final BrokerConfig brokerConfig;
private final String localAddress;
private final BrokerOuterAPI brokerOuterAPI;
private List<String> controllerAddresses;
+ private final ConcurrentMap<String, Boolean> availableControllerAddresses;
private volatile String controllerLeaderAddress = "";
private volatile State state = State.INITIAL;
@@ -84,8 +91,11 @@ public class ReplicasManager {
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
+ this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
+ this.availableControllerAddresses = new ConcurrentHashMap<>();
this.syncStateSet = new HashSet<>();
this.localAddress = brokerController.getBrokerAddr();
this.haService.setLocalAddress(this.localAddress);
@@ -104,7 +114,9 @@ public class ReplicasManager {
public void start() {
updateControllerAddr();
+ scanAvailableControllerAddresses();
this.scheduledService.scheduleAtFixedRate(this::updateControllerAddr, 2 * 60 * 1000, 2 * 60 * 1000, TimeUnit.MILLISECONDS);
+ this.scheduledService.scheduleAtFixedRate(this::scanAvailableControllerAddresses, 3 * 1000, 3 * 1000, TimeUnit.MILLISECONDS);
if (!startBasicService()) {
LOGGER.error("Failed to start replicasManager");
this.executorService.submit(() -> {
@@ -390,7 +402,7 @@ public class ReplicasManager {
* Update controller leader address by rpc.
*/
private boolean updateControllerMetadata() {
- for (String address : this.controllerAddresses) {
+ for (String address : this.availableControllerAddresses.keySet()) {
try {
final GetMetaDataResponseHeader responseHeader = this.brokerOuterAPI.getControllerMetaData(address);
if (responseHeader != null && StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) {
@@ -445,6 +457,36 @@ public class ReplicasManager {
}
}
+ private void scanAvailableControllerAddresses() {
+ if (controllerAddresses == null) {
+ LOGGER.warn("scanAvailableControllerAddresses addresses of controller is null!");
+ return;
+ }
+
+ for (String address : availableControllerAddresses.keySet()) {
+ if (!controllerAddresses.contains(address)) {
+ LOGGER.warn("scanAvailableControllerAddresses remove invalid address {}", address);
+ availableControllerAddresses.remove(address);
+ }
+ }
+
+ for (String address : controllerAddresses) {
+ scanExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ if (brokerOuterAPI.checkAddressReachable(address)) {
+ availableControllerAddresses.putIfAbsent(address, true);
+ } else {
+ Boolean value = availableControllerAddresses.remove(address);
+ if (value != null) {
+ LOGGER.warn("scanAvailableControllerAddresses remove unconnected address {}", address);
+ }
+ }
+ }
+ });
+ }
+ }
+
private void updateControllerAddr() {
if (brokerConfig.isFetchControllerAddrByDnsLookup()) {
this.controllerAddresses = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr());
@@ -491,4 +533,8 @@ public class ReplicasManager {
public List<EpochEntry> getEpochEntries() {
return this.haService.getEpochEntries();
}
+
+ public List<String> getAvailableControllerAddresses() {
+ return new ArrayList<>(availableControllerAddresses.keySet());
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 0ae1fe5ab..689e060d8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -201,6 +201,10 @@ public class BrokerOuterAPI {
return addressList;
}
+ public boolean checkAddressReachable(String address) {
+ return this.remotingClient.isAddressReachable(address);
+ }
+
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List<String> lst = new ArrayList<>(Arrays.asList(addrArray));
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 01eacf43b..ca7df5691 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -123,6 +123,7 @@ public class ReplicasManagerTest {
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
+ when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
replicasManager = new ReplicasManager(brokerController);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index cc92efc4a..5c3766b2d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -52,5 +52,7 @@ public interface RemotingClient extends RemotingService {
boolean isChannelWritable(final String addr);
+ boolean isAddressReachable(final String addr);
+
void closeChannels(final List<String> addrList);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 94acf0288..8dddb4e35 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -809,6 +809,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return true;
}
+ @Override
+ public boolean isAddressReachable(String addr) {
+ if (addr == null || addr.isEmpty()) {
+ return false;
+ }
+ try {
+ Channel channel = getAndCreateChannel(addr);
+ return channel != null && channel.isActive();
+ } catch (Exception e) {
+ LOGGER.warn("Get and create channel of {} failed", addr, e);
+ return false;
+ }
+ }
+
@Override
public List<String> getNameServerAddressList() {
return this.namesrvAddrList.get();