You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/21 06:00:07 UTC
[rocketmq] branch develop updated: [ISSUE #6382] Periodically check for inactive masters (#6383)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1e2a30126 [ISSUE #6382] Periodically check for inactive masters (#6383)
1e2a30126 is described below
commit 1e2a301266a41e7b4088f6f956dd850df407a600
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Tue Mar 21 13:59:53 2023 +0800
[ISSUE #6382] Periodically check for inactive masters (#6383)
* feat(controller): concurrent protect ReplicasInfoManager#replicaInfoTable and ReplicasInfoManager#syncStateSetInfoTable
1. concurrent protect ReplicasInfoManager#replicaInfoTable and
ReplicasInfoManager#syncStateSetInfoTable
* docs(controller): make statemachine thread-safe and support timed checking inactive master in each broker-set
1. make statemachine thread-safe
2. support timed checking inactive
master in each broker-set
* fix(controller): Add some logic when visit the BrokerReplicaInfo to avoid NPE
1. Add some logic when visit the BrokerReplicaInfo to avoid NPE
* test(controller): polish some test logic
1. polish some test logic
* fix(controller): Revise back the ControllerConfig#controllerStorePath to avoid the Incompatibility problems
1. Revise back the ControllerConfig#controllerStorePath to avoid the
Incompatibility problems
---
.../apache/rocketmq/common/ControllerConfig.java | 14 +++++
.../controller/BrokerHeartbeatManager.java | 10 +---
.../org/apache/rocketmq/controller/Controller.java | 8 +++
.../rocketmq/controller/ControllerManager.java | 52 +++++++++++------
.../controller/helper/BrokerLifecycleListener.java | 25 +++++++++
.../controller/impl/DLedgerController.java | 62 +++++++++++++++++++--
.../heartbeat/DefaultBrokerHeartbeatManager.java | 3 +-
.../controller/impl/manager/BrokerReplicaInfo.java | 23 +++++---
.../impl/manager/ReplicasInfoManager.java | 41 ++++++++++----
.../controller/impl/manager/SyncStateInfo.java | 31 ++++-------
.../controller/impl/DLedgerControllerTest.java | 65 +++++++++++++++++++---
.../impl/DefaultBrokerHeartbeatManagerTest.java | 2 +-
12 files changed, 256 insertions(+), 80 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
index 942c03874..b35198fc6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
@@ -25,6 +25,7 @@ public class ControllerConfig {
/**
* Interval of periodic scanning for non-active broker;
+ * Unit: millisecond
*/
private long scanNotActiveBrokerInterval = 5 * 1000;
@@ -58,6 +59,11 @@ public class ControllerConfig {
* Whether notify broker when its role changed
*/
private volatile boolean notifyBrokerRoleChanged = true;
+ /**
+ * Interval of periodic scanning for non-active master in each broker-set;
+ * Unit: millisecond
+ */
+ private long scanInactiveMasterInterval = 5 * 1000;
public String getRocketmqHome() {
return rocketmqHome;
@@ -162,4 +168,12 @@ public class ControllerConfig {
public void setNotifyBrokerRoleChanged(boolean notifyBrokerRoleChanged) {
this.notifyBrokerRoleChanged = notifyBrokerRoleChanged;
}
+
+ public long getScanInactiveMasterInterval() {
+ return scanInactiveMasterInterval;
+ }
+
+ public void setScanInactiveMasterInterval(long scanInactiveMasterInterval) {
+ this.scanInactiveMasterInterval = scanInactiveMasterInterval;
+ }
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 3a7dcaf58..71b274c09 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.controller;
import io.netty.channel.Channel;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
public interface BrokerHeartbeatManager {
@@ -46,7 +47,7 @@ public interface BrokerHeartbeatManager {
/**
* Add BrokerLifecycleListener.
*/
- void addBrokerLifecycleListener(final BrokerLifecycleListener listener);
+ void registerBrokerLifecycleListener(final BrokerLifecycleListener listener);
/**
* Broker channel close
@@ -62,11 +63,4 @@ public interface BrokerHeartbeatManager {
* Check whether broker active
*/
boolean isBrokerActive(final String clusterName, final String brokerName, final Long brokerId);
-
- interface BrokerLifecycleListener {
- /**
- * Trigger when broker inactive.
- */
- void onBrokerInactive(final String clusterName, final String brokerName, final Long brokerId);
- }
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 2c0372fec..cda613091 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.controller;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
@@ -109,6 +111,12 @@ public interface Controller {
*/
CompletableFuture<RemotingCommand> getSyncStateData(final List<String> brokerNames);
+ /**
+ * Add broker's lifecycle listener
+ * @param listener listener
+ */
+ void registerBrokerLifecycleListener(final BrokerLifecycleListener listener);
+
/**
* Get the remotingServer used by the controller, the upper layer will reuse this remotingServer.
*/
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index a9949bde0..46826517c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -117,7 +117,8 @@ public class ControllerManager {
this.heartbeatManager.initialize();
// Register broker inactive listener
- this.heartbeatManager.addBrokerLifecycleListener(this::onBrokerInactive);
+ this.heartbeatManager.registerBrokerLifecycleListener(this::onBrokerInactive);
+ this.controller.registerBrokerLifecycleListener(this::onBrokerInactive);
registerProcessor();
return true;
}
@@ -128,34 +129,49 @@ public class ControllerManager {
*
* @param clusterName The cluster name of this inactive broker
* @param brokerName The inactive broker name
- * @param brokerId The inactive broker id
+ * @param brokerId The inactive broker id, null means that the election forced to be triggered
*/
private void onBrokerInactive(String clusterName, String brokerName, Long brokerId) {
if (controller.isLeaderState()) {
- try {
- final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
- final RemotingCommand replicaInfoResponse = replicaInfoFuture.get(5, TimeUnit.SECONDS);
+ if (brokerId == null) {
+ // Means that force triggering election for this broker-set
+ triggerElectMaster(brokerName);
+ return;
+ }
+ final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
+ replicaInfoFuture.whenCompleteAsync((replicaInfoResponse, err) -> {
+ if (err != null || replicaInfoResponse == null) {
+ log.error("Failed to get replica-info for broker-set: {} when OnBrokerInactive", brokerName, err);
+ return;
+ }
final GetReplicaInfoResponseHeader replicaInfoResponseHeader = (GetReplicaInfoResponseHeader) replicaInfoResponse.readCustomHeader();
// Not master broker offline
if (!brokerId.equals(replicaInfoResponseHeader.getMasterBrokerId())) {
- log.warn("The broker with brokerId: {} in broker-set: {} shutdown", brokerId, brokerName);
+ log.warn("The broker with brokerId: {} in broker-set: {} has been inactive", brokerId, brokerName);
return;
}
+ // Trigger election
+ triggerElectMaster(brokerName);
+ });
+ } else {
+ log.warn("The broker with brokerId: {} in broker-set: {} has been inactive", brokerId, brokerName);
+ }
+ }
- final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName));
- final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
- if (electMasterResponse.getCode() == ResponseCode.SUCCESS) {
- log.info("The broker with brokerId: {} in broker-set: {} shutdown, elect a new master done, result: {}", brokerId, brokerName, electMasterResponse);
- if (controllerConfig.isNotifyBrokerRoleChanged()) {
- notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(electMasterResponse));
- }
+ private void triggerElectMaster(String brokerName) {
+ final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName));
+ electMasterFuture.whenCompleteAsync((electMasterResponse, err) -> {
+ if (err != null || electMasterResponse == null) {
+ log.error("Failed to trigger elect-master in broker-set: {}", brokerName, err);
+ return;
+ }
+ if (electMasterResponse.getCode() == ResponseCode.SUCCESS) {
+ log.info("Elect a new master in broker-set: {} done, result: {}", brokerName, electMasterResponse);
+ if (controllerConfig.isNotifyBrokerRoleChanged()) {
+ notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(electMasterResponse));
}
- } catch (Exception e) {
- log.error("", e);
}
- } else {
- log.warn("The broker with brokerId: {} in broker-set: {} shutdown", brokerId, brokerName);
- }
+ });
}
/**
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLifecycleListener.java b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLifecycleListener.java
new file mode 100644
index 000000000..31fa47632
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLifecycleListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.helper;
+
+public interface BrokerLifecycleListener {
+ /**
+ * Trigger when broker inactive.
+ */
+ void onBrokerInactive(final String clusterName, final String brokerName, final Long brokerId);
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 491cb16d1..b6007fe09 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -32,6 +32,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@@ -42,6 +44,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.EventMessage;
@@ -80,6 +83,11 @@ public class DLedgerController implements Controller {
private final EventSerializer eventSerializer;
private final RoleChangeHandler roleHandler;
private final DLedgerControllerStateMachine statemachine;
+ private final ScheduledExecutorService scanInactiveMasterService;
+
+ private ScheduledFuture scanInactiveMasterFuture;
+
+ private List<BrokerLifecycleListener> brokerLifecycleListeners;
// Usr for checking whether the broker is alive
private BrokerValidPredicate brokerAlivePredicate;
@@ -116,6 +124,8 @@ public class DLedgerController implements Controller {
this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
this.dLedgerServer.registerStateMachine(this.statemachine);
this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
+ this.scanInactiveMasterService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
+ this.brokerLifecycleListeners = new ArrayList<>();
}
@Override
@@ -125,6 +135,7 @@ public class DLedgerController implements Controller {
@Override
public void shutdown() {
+ this.cancelScanInactiveFuture();
this.dLedgerServer.shutdown();
}
@@ -193,6 +204,11 @@ public class DLedgerController implements Controller {
() -> this.replicasInfoManager.getSyncStateData(brokerNames, brokerAlivePredicate), false);
}
+ @Override
+ public void registerBrokerLifecycleListener(BrokerLifecycleListener listener) {
+ this.brokerLifecycleListeners.add(listener);
+ }
+
@Override
public RemotingCommand getControllerMetadata() {
final MemberState state = getMemberState();
@@ -218,19 +234,40 @@ public class DLedgerController implements Controller {
() -> this.replicasInfoManager.cleanBrokerData(requestHeader, this.brokerAlivePredicate), true);
}
+ /**
+ * Scan all broker-set in statemachine, find that the broker-set which
+ * its master has been timeout but still has at least one broker keep alive with controller,
+ * and we trigger an election to update its state.
+ */
+ private void scanInactiveMasterAndTriggerReelect() {
+ if (!this.roleHandler.isLeaderState()) {
+ cancelScanInactiveFuture();
+ return;
+ }
+ List<String> brokerSets = this.replicasInfoManager.scanNeedReelectBrokerSets(this.brokerAlivePredicate);
+ for (String brokerName : brokerSets) {
+ // Notify ControllerManager
+ this.brokerLifecycleListeners.forEach(listener -> listener.onBrokerInactive(null, brokerName, null));
+ }
+ }
+
/**
* Append the request to DLedger, and wait for DLedger to commit the request.
*/
- private boolean appendToDLedgerAndWait(final AppendEntryRequest request) throws Throwable {
+ private boolean appendToDLedgerAndWait(final AppendEntryRequest request) {
if (request != null) {
request.setGroup(this.dLedgerConfig.getGroup());
request.setRemoteId(this.dLedgerConfig.getSelfId());
-
- final AppendFuture<AppendEntryResponse> dLedgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
- if (dLedgerFuture.getPos() == -1) {
+ try {
+ final AppendFuture<AppendEntryResponse> dLedgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+ if (dLedgerFuture.getPos() == -1) {
+ return false;
+ }
+ dLedgerFuture.get(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Failed to append entry to DLedger", e);
return false;
}
- dLedgerFuture.get(5, TimeUnit.SECONDS);
return true;
}
return false;
@@ -249,6 +286,13 @@ public class DLedgerController implements Controller {
this.electPolicy = electPolicy;
}
+ private void cancelScanInactiveFuture() {
+ if (this.scanInactiveMasterFuture != null) {
+ this.scanInactiveMasterFuture.cancel(true);
+ this.scanInactiveMasterFuture = null;
+ }
+ }
+
/**
* Event handler that handle event
*/
@@ -433,11 +477,13 @@ public class DLedgerController implements Controller {
this.currentRole = MemberState.Role.CANDIDATE;
log.info("Controller {} change role to candidate", this.selfId);
DLedgerController.this.stopScheduling();
+ DLedgerController.this.cancelScanInactiveFuture();
break;
case FOLLOWER:
this.currentRole = MemberState.Role.FOLLOWER;
log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, getMemberState().getLeaderId());
DLedgerController.this.stopScheduling();
+ DLedgerController.this.cancelScanInactiveFuture();
break;
case LEADER: {
log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
@@ -452,6 +498,12 @@ public class DLedgerController implements Controller {
if (appendToDLedgerAndWait(request)) {
this.currentRole = MemberState.Role.LEADER;
DLedgerController.this.startScheduling();
+ if (DLedgerController.this.scanInactiveMasterFuture == null) {
+ long scanInactiveMasterInterval = DLedgerController.this.controllerConfig.getScanInactiveMasterInterval();
+ DLedgerController.this.scanInactiveMasterFuture =
+ DLedgerController.this.scanInactiveMasterService.scheduleAtFixedRate(DLedgerController.this::scanInactiveMasterAndTriggerReelect,
+ scanInactiveMasterInterval, scanInactiveMasterInterval, TimeUnit.MILLISECONDS);
+ }
break;
}
} catch (final Throwable e) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
index 63b0c2e5f..dc824281b 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -99,7 +100,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
}
@Override
- public void addBrokerLifecycleListener(BrokerLifecycleListener listener) {
+ public void registerBrokerLifecycleListener(BrokerLifecycleListener listener) {
this.brokerLifecycleListeners.add(listener);
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index 27eeab80e..f93074793 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.controller.impl.manager;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
@@ -34,13 +36,13 @@ public class BrokerReplicaInfo {
// Start from 1
private final AtomicLong nextAssignBrokerId;
- private final HashMap<Long/*brokerId*/, Pair<String/*ipAddress*/, String/*registerCheckCode*/>> brokerIdInfo;
+ private final Map<Long/*brokerId*/, Pair<String/*ipAddress*/, String/*registerCheckCode*/>> brokerIdInfo;
public BrokerReplicaInfo(String clusterName, String brokerName) {
this.clusterName = clusterName;
this.brokerName = brokerName;
this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_BROKER_CONTROLLER_ID);
- this.brokerIdInfo = new HashMap<>();
+ this.brokerIdInfo = new ConcurrentHashMap<>();
}
public void removeBrokerId(final Long brokerId) {
@@ -72,8 +74,8 @@ public class BrokerReplicaInfo {
return new HashSet<>(this.brokerIdInfo.keySet());
}
- public HashMap<Long, String> getBrokerIdTable() {
- HashMap<Long/*brokerId*/, String/*address*/> map = new HashMap<>(this.brokerIdInfo.size());
+ public Map<Long, String> getBrokerIdTable() {
+ Map<Long/*brokerId*/, String/*address*/> map = new HashMap<>(this.brokerIdInfo.size());
this.brokerIdInfo.forEach((id, pair) -> {
map.put(id, pair.getObject1());
});
@@ -81,20 +83,25 @@ public class BrokerReplicaInfo {
}
public String getBrokerAddress(final Long brokerId) {
- if (this.brokerIdInfo.containsKey(brokerId)) {
- return this.brokerIdInfo.get(brokerId).getObject1();
+ if (brokerId == null) return null;
+ Pair<String, String> pair = this.brokerIdInfo.get(brokerId);
+ if (pair != null) {
+ return pair.getObject1();
}
return null;
}
public String getBrokerRegisterCheckCode(final Long brokerId) {
- if (this.brokerIdInfo.containsKey(brokerId)) {
- return this.brokerIdInfo.get(brokerId).getObject2();
+ if (brokerId == null) return null;
+ Pair<String, String> pair = this.brokerIdInfo.get(brokerId);
+ if (pair != null) {
+ return pair.getObject2();
}
return null;
}
public void updateBrokerAddress(final Long brokerId, final String brokerAddress) {
+ if (brokerId == null) return;
Pair<String, String> oldPair = this.brokerIdInfo.get(brokerId);
if (oldPair != null) {
this.brokerIdInfo.put(brokerId, new Pair<>(brokerAddress, oldPair.getObject2()));
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 2f5c3307c..b0a67531d 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -19,9 +19,11 @@ package org.apache.rocketmq.controller.impl.manager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -63,8 +65,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.Register
/**
* The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
- * state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can
- * be called sequentially
+ * state machine. If the upper layer want to update the statemachine, it must sequentially call its methods.
*/
public class ReplicasInfoManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -74,8 +75,8 @@ public class ReplicasInfoManager {
public ReplicasInfoManager(final ControllerConfig config) {
this.controllerConfig = config;
- this.replicaInfoTable = new HashMap<>();
- this.syncStateSetInfoTable = new HashMap<>();
+ this.replicaInfoTable = new ConcurrentHashMap<String, BrokerReplicaInfo>();
+ this.syncStateSetInfoTable = new ConcurrentHashMap<String, SyncStateInfo>();
}
public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
@@ -221,7 +222,7 @@ public class ReplicasInfoManager {
response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet);
- BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName);
+ BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerReplicaInfo);
if (null != brokerMemberGroup) {
responseBody.setBrokerMemberGroup(brokerMemberGroup);
}
@@ -244,12 +245,11 @@ public class ReplicasInfoManager {
return result;
}
- private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
- if (isContainsBroker(brokerName)) {
- final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
- final BrokerMemberGroup group = new BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerName);
- final HashMap<Long, String> brokerIdTable = brokerReplicaInfo.getBrokerIdTable();
- final HashMap<Long, String> memberGroup = new HashMap<>();
+ private BrokerMemberGroup buildBrokerMemberGroup(final BrokerReplicaInfo brokerReplicaInfo) {
+ if (brokerReplicaInfo != null) {
+ final BrokerMemberGroup group = new BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName());
+ final Map<Long, String> brokerIdTable = brokerReplicaInfo.getBrokerIdTable();
+ final Map<Long, String> memberGroup = new HashMap<>();
brokerIdTable.forEach((id, addr) -> memberGroup.put(id, addr));
group.setBrokerAddrs(memberGroup);
return group;
@@ -433,6 +433,25 @@ public class ReplicasInfoManager {
return result;
}
+ public List<String/*BrokerName*/> scanNeedReelectBrokerSets(final BrokerValidPredicate validPredicate) {
+ List<String> needReelectBrokerSets = new LinkedList<>();
+ this.syncStateSetInfoTable.forEach((brokerName, syncStateInfo) -> {
+ Long masterBrokerId = syncStateInfo.getMasterBrokerId();
+ String clusterName = syncStateInfo.getClusterName();
+ // Now master is inactive
+ if (masterBrokerId != null && !validPredicate.check(clusterName, brokerName, masterBrokerId)) {
+ // Still at least one broker alive
+ Set<Long> brokerIds = this.replicaInfoTable.get(brokerName).getBrokerIdTable().keySet();
+ boolean alive = brokerIds.stream().anyMatch(id -> validPredicate.check(clusterName, brokerName, id));
+ if (alive) {
+ needReelectBrokerSets.add(brokerName);
+ }
+ }
+ });
+ return needReelectBrokerSets;
+ }
+
+
/**
* Apply events to memory statemachine.
*
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 0951df93a..a01298d9a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.controller.impl.manager;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Manages the syncStateSet of broker replicas.
@@ -26,45 +27,33 @@ import java.util.Set;
public class SyncStateInfo {
private final String clusterName;
private final String brokerName;
+ private final AtomicInteger masterEpoch;
+ private final AtomicInteger syncStateSetEpoch;
private Set<Long/*brokerId*/> syncStateSet;
- private int syncStateSetEpoch;
private Long masterBrokerId;
- private int masterEpoch;
public SyncStateInfo(String clusterName, String brokerName) {
this.clusterName = clusterName;
this.brokerName = brokerName;
- this.masterEpoch = 0;
- this.syncStateSetEpoch = 0;
+ this.masterEpoch = new AtomicInteger(0);
+ this.syncStateSetEpoch = new AtomicInteger(0);
this.syncStateSet = Collections.emptySet();
}
-
- public SyncStateInfo(String clusterName, String brokerName, Long masterBrokerId) {
- this.clusterName = clusterName;
- this.brokerName = brokerName;
- this.masterBrokerId = masterBrokerId;
- this.masterEpoch = 1;
- this.syncStateSet = new HashSet<>();
- this.syncStateSet.add(masterBrokerId);
- this.syncStateSetEpoch = 1;
- }
-
-
public void updateMasterInfo(Long masterBrokerId) {
this.masterBrokerId = masterBrokerId;
- this.masterEpoch++;
+ this.masterEpoch.incrementAndGet();
}
public void updateSyncStateSetInfo(Set<Long> newSyncStateSet) {
this.syncStateSet = new HashSet<>(newSyncStateSet);
- this.syncStateSetEpoch++;
+ this.syncStateSetEpoch.incrementAndGet();
}
public boolean isFirstTimeForElect() {
- return this.masterEpoch == 0;
+ return this.masterEpoch.get() == 0;
}
public boolean isMasterExist() {
@@ -84,7 +73,7 @@ public class SyncStateInfo {
}
public int getSyncStateSetEpoch() {
- return syncStateSetEpoch;
+ return syncStateSetEpoch.get();
}
public Long getMasterBrokerId() {
@@ -92,7 +81,7 @@ public class SyncStateInfo {
}
public int getMasterEpoch() {
- return masterEpoch;
+ return masterEpoch.get();
}
public void removeFromSyncState(final Long brokerId) {
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
index eaf78b63d..595a5cb65 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.controller.impl;
-import io.openmessaging.storage.dledger.DLedgerConfig;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
@@ -26,6 +25,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
@@ -62,8 +63,7 @@ public class DLedgerControllerTest {
private List<String> baseDirs;
private List<DLedgerController> controllers;
- public DLedgerController launchController(final String group, final String peers, final String selfId,
- String storeType, final boolean isEnableElectUncleanMaster) {
+ public DLedgerController launchController(final String group, final String peers, final String selfId, final boolean isEnableElectUncleanMaster) {
String tmpdir = System.getProperty("java.io.tmpdir");
final String path = (StringUtils.endsWith(tmpdir, File.separator) ? tmpdir : tmpdir + File.separator) + group + File.separator + selfId;
baseDirs.add(path);
@@ -75,7 +75,7 @@ public class DLedgerControllerTest {
config.setControllerStorePath(path);
config.setMappedFileSize(10 * 1024 * 1024);
config.setEnableElectUncleanMaster(isEnableElectUncleanMaster);
-
+ config.setScanInactiveMasterInterval(1000);
final DLedgerController controller = new DLedgerController(config, (str1, str2, str3) -> true);
controller.startup();
@@ -172,9 +172,9 @@ public class DLedgerControllerTest {
public DLedgerController mockMetaData(boolean enableElectUncleanMaster) throws Exception {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
- DLedgerController c0 = launchController(group, peers, "n0", DLedgerConfig.MEMORY, enableElectUncleanMaster);
- DLedgerController c1 = launchController(group, peers, "n1", DLedgerConfig.MEMORY, enableElectUncleanMaster);
- DLedgerController c2 = launchController(group, peers, "n2", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+ DLedgerController c0 = launchController(group, peers, "n0", enableElectUncleanMaster);
+ DLedgerController c1 = launchController(group, peers, "n1", enableElectUncleanMaster);
+ DLedgerController c2 = launchController(group, peers, "n2", enableElectUncleanMaster);
controllers.add(c0);
controllers.add(c1);
controllers.add(c2);
@@ -236,6 +236,57 @@ public class DLedgerControllerTest {
assertNotEquals(DEFAULT_IP[0], response.getMasterAddress());
}
+ @Test
+ public void testBrokerLifecycleListener() throws Exception {
+ final DLedgerController leader = mockMetaData(false);
+ // Mock that master broker has been inactive, and try to elect a new master from sync-state-set
+ // But we shut down two controller, so the ElectMasterEvent will be appended to DLedger failed.
+ // So the statemachine still keep the stale master's information
+ List<DLedgerController> removed = controllers.stream().filter(controller -> controller != leader).collect(Collectors.toList());
+ for (DLedgerController dLedgerController : removed) {
+ dLedgerController.shutdown();
+ controllers.remove(dLedgerController);
+ }
+ final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
+ setBrokerElectPolicy(leader, 1L);
+ Exception exception = null;
+ try {
+ leader.electMaster(request).get(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ exception = e;
+ }
+ assertNotNull(exception);
+ // Shut down leader controller
+ leader.shutdown();
+ controllers.remove(leader);
+ // Restart two controller
+ for (DLedgerController controller : removed) {
+ if (controller != leader) {
+ ControllerConfig config = controller.getControllerConfig();
+ DLedgerController newController = launchController(config.getControllerDLegerGroup(), config.getControllerDLegerPeers(), config.getControllerDLegerSelfId(), false);
+ controllers.add(newController);
+ newController.startup();
+ }
+ }
+ DLedgerController newLeader = waitLeader(controllers);
+ setBrokerAlivePredicate(newLeader, 1L);
+ // Check if the statemachine is stale
+ final RemotingCommand resp = newLeader.getReplicaInfo(new GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).
+ get(10, TimeUnit.SECONDS);
+ final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) resp.readCustomHeader();
+ assertEquals(1, replicaInfo.getMasterBrokerId().longValue());
+ assertEquals(1, replicaInfo.getMasterEpoch().intValue());
+
+ // Register broker's lifecycle listener
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ newLeader.registerBrokerLifecycleListener((clusterName, brokerName, brokerId) -> {
+ assertEquals(DEFAULT_BROKER_NAME, brokerName);
+ atomicBoolean.set(true);
+ });
+ Thread.sleep(2000);
+ assertTrue(atomicBoolean.get());
+ }
+
@Test
public void testAllReplicasShutdownAndRestartWithUnEnableElectUnCleanMaster() throws Exception {
final DLedgerController leader = mockMetaData(false);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index b97ea3249..395f3bab4 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -41,7 +41,7 @@ public class DefaultBrokerHeartbeatManagerTest {
@Test
public void testDetectBrokerAlive() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
- this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerId) -> {
+ this.heartbeatManager.registerBrokerLifecycleListener((clusterName, brokerName, brokerId) -> {
latch.countDown();
});
this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:7000", 1L,3000L, null,