You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2023/01/30 08:01:35 UTC

[rocketmq] branch develop updated: [ISSUE #5929] Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition (#5930)

This is an automated email from the ASF dual-hosted git repository.

huitong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1e7c861c1 [ISSUE #5929] Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition (#5930)
1e7c861c1 is described below

commit 1e7c861c1e824ed1d7d2b1002c7078f1ece35e27
Author: rongtong <ji...@163.com>
AuthorDate: Mon Jan 30 16:01:28 2023 +0800

    [ISSUE #5929] Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition (#5930)
    
    * Fix the issue that broker send heartbeat to controller frequently causing thread blocking when the network partition
    
    * Fix the ReplicasManagerTest can not pass
    
    * Reslove conflict with #5922
    
    * Fix code style
    
    * Rename isAddressCanConnect to isAddressReachable
    
    * Rename isAddressCanConnect to isAddressReachable
---
 .../apache/rocketmq/broker/BrokerController.java   |  2 +-
 .../broker/controller/ReplicasManager.java         | 48 +++++++++++++++++++++-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  4 ++
 .../broker/controller/ReplicasManagerTest.java     |  1 +
 .../apache/rocketmq/remoting/RemotingClient.java   |  2 +
 .../remoting/netty/NettyRemotingClient.java        | 14 +++++++
 6 files changed, 69 insertions(+), 2 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8b3dbdfa9..a96aa0405 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1739,7 +1739,7 @@ public class BrokerController {
 
     protected void sendHeartbeat() {
         if (this.brokerConfig.isEnableControllerMode()) {
-            final List<String> controllerAddresses = this.replicasManager.getControllerAddresses();
+            final List<String> controllerAddresses = this.replicasManager.getAvailableControllerAddresses();
             for (String controllerAddress : controllerAddresses) {
                 if (StringUtils.isNotEmpty(controllerAddress)) {
                     this.brokerOuterAPI.sendHeartbeatToController(
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index dd1c4385e..677faca02 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -17,14 +17,19 @@
 
 package org.apache.rocketmq.broker.controller;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
@@ -60,12 +65,14 @@ public class ReplicasManager {
 
     private final ScheduledExecutorService scheduledService;
     private final ExecutorService executorService;
+    private final ExecutorService scanExecutor;
     private final BrokerController brokerController;
     private final AutoSwitchHAService haService;
     private final BrokerConfig brokerConfig;
     private final String localAddress;
     private final BrokerOuterAPI brokerOuterAPI;
     private List<String> controllerAddresses;
+    private final ConcurrentMap<String, Boolean> availableControllerAddresses;
 
     private volatile String controllerLeaderAddress = "";
     private volatile State state = State.INITIAL;
@@ -84,8 +91,11 @@ public class ReplicasManager {
         this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
         this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
         this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
+        this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity()));
         this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
         this.brokerConfig = brokerController.getBrokerConfig();
+        this.availableControllerAddresses = new ConcurrentHashMap<>();
         this.syncStateSet = new HashSet<>();
         this.localAddress = brokerController.getBrokerAddr();
         this.haService.setLocalAddress(this.localAddress);
@@ -104,7 +114,9 @@ public class ReplicasManager {
 
     public void start() {
         updateControllerAddr();
+        scanAvailableControllerAddresses();
         this.scheduledService.scheduleAtFixedRate(this::updateControllerAddr, 2 * 60 * 1000, 2 * 60 * 1000, TimeUnit.MILLISECONDS);
+        this.scheduledService.scheduleAtFixedRate(this::scanAvailableControllerAddresses, 3 * 1000, 3 * 1000, TimeUnit.MILLISECONDS);
         if (!startBasicService()) {
             LOGGER.error("Failed to start replicasManager");
             this.executorService.submit(() -> {
@@ -390,7 +402,7 @@ public class ReplicasManager {
      * Update controller leader address by rpc.
      */
     private boolean updateControllerMetadata() {
-        for (String address : this.controllerAddresses) {
+        for (String address : this.availableControllerAddresses.keySet()) {
             try {
                 final GetMetaDataResponseHeader responseHeader = this.brokerOuterAPI.getControllerMetaData(address);
                 if (responseHeader != null && StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) {
@@ -445,6 +457,36 @@ public class ReplicasManager {
         }
     }
 
+    private void scanAvailableControllerAddresses() {
+        if (controllerAddresses == null) {
+            LOGGER.warn("scanAvailableControllerAddresses addresses of controller is null!");
+            return;
+        }
+
+        for (String address : availableControllerAddresses.keySet()) {
+            if (!controllerAddresses.contains(address)) {
+                LOGGER.warn("scanAvailableControllerAddresses remove invalid address {}", address);
+                availableControllerAddresses.remove(address);
+            }
+        }
+
+        for (String address : controllerAddresses) {
+            scanExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    if (brokerOuterAPI.checkAddressReachable(address)) {
+                        availableControllerAddresses.putIfAbsent(address, true);
+                    } else {
+                        Boolean value = availableControllerAddresses.remove(address);
+                        if (value != null) {
+                            LOGGER.warn("scanAvailableControllerAddresses remove unconnected address {}", address);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
     private void updateControllerAddr() {
         if (brokerConfig.isFetchControllerAddrByDnsLookup()) {
             this.controllerAddresses = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr());
@@ -491,4 +533,8 @@ public class ReplicasManager {
     public List<EpochEntry> getEpochEntries() {
         return this.haService.getEpochEntries();
     }
+
+    public List<String> getAvailableControllerAddresses() {
+        return new ArrayList<>(availableControllerAddresses.keySet());
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 0ae1fe5ab..689e060d8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -201,6 +201,10 @@ public class BrokerOuterAPI {
         return addressList;
     }
 
+    public boolean checkAddressReachable(String address) {
+        return this.remotingClient.isAddressReachable(address);
+    }
+
     public void updateNameServerAddressList(final String addrs) {
         String[] addrArray = addrs.split(";");
         List<String> lst = new ArrayList<>(Arrays.asList(addrArray));
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 01eacf43b..ca7df5691 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -123,6 +123,7 @@ public class ReplicasManagerTest {
         when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
         when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
         when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
+        when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
         when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
         when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
         replicasManager = new ReplicasManager(brokerController);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index cc92efc4a..5c3766b2d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -52,5 +52,7 @@ public interface RemotingClient extends RemotingService {
 
     boolean isChannelWritable(final String addr);
 
+    boolean isAddressReachable(final String addr);
+
     void closeChannels(final List<String> addrList);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 94acf0288..8dddb4e35 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -809,6 +809,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         return true;
     }
 
+    @Override
+    public boolean isAddressReachable(String addr) {
+        if (addr == null || addr.isEmpty()) {
+            return false;
+        }
+        try {
+            Channel channel = getAndCreateChannel(addr);
+            return channel != null && channel.isActive();
+        } catch (Exception e) {
+            LOGGER.warn("Get and create channel of {} failed", addr, e);
+            return false;
+        }
+    }
+
     @Override
     public List<String> getNameServerAddressList() {
         return this.namesrvAddrList.get();